Compare commits

...

21 Commits

Author SHA1 Message Date
Kai Tamkun
b3105af730 Merge branch 'main' into kai/http-chunks 2025-04-17 14:49:16 -07:00
Kai Tamkun
5cd2799720 Fix test/regression/issue/04298/04298.test.ts 2025-04-16 18:31:52 -07:00
Kai Tamkun
d19c639184 Fix test-http-exceptions.js 2025-04-16 18:31:37 -07:00
Kai Tamkun
9a4d207563 Write separating \r\n 2025-04-16 16:49:11 -07:00
Kai Tamkun
1de23d1769 Split up node:http 2025-04-14 17:05:58 -07:00
Kai Tamkun
20a712c9c1 Move node:http Agent into _http_agent 2025-04-14 13:30:35 -07:00
Kai Tamkun
1d4cfb604c Merge branch 'main' into kai/http-chunks 2025-04-14 12:55:03 -07:00
Kai Tamkun
9c85ad389e test-http-outgoing-finish-writable.js 2025-04-10 21:05:50 -07:00
Kai Tamkun
e5af5a39a7 Merge branch 'main' into kai/http-chunks 2025-04-10 15:09:54 -07:00
Kai Tamkun
30e989fdc9 ERR_HTTP_HEADERS_SENT in setHeaders 2025-04-10 15:09:01 -07:00
Kai Tamkun
29c24c7343 Validate header values in setHeader 2025-04-07 19:38:21 -07:00
Kai Tamkun
d92488234a Emit node:http custom lookup errors on next tick 2025-04-07 16:54:00 -07:00
Kai Tamkun
a4c1c4a1f6 test-http-client-request-options.js 2025-04-07 16:36:36 -07:00
Kai Tamkun
60b99a4b58 Preserve empty strings in statusText 2025-04-07 15:12:40 -07:00
Kai Tamkun
ac856747de Fix invalid status code error formatting 2025-04-07 14:36:50 -07:00
Kai Tamkun
eadad0f548 Defer HTTP server timeouts added before a call to .listen() 2025-04-07 14:35:38 -07:00
Kai Tamkun
99ec99de5c Fix test-http-client-response-timeout.js 2025-04-04 21:16:51 -07:00
Kai Tamkun
e3cfcb41ce Obviate anyChunkWritten 2025-04-04 19:26:24 -07:00
Kai Tamkun
f88bca2d0c Fix newline before terminating chunk 2025-04-04 18:58:06 -07:00
Kai Tamkun
f728bff3c4 Merge branch 'main' into kai/http-chunks 2025-04-04 18:33:19 -07:00
Kai Tamkun
96a47de3cb Initial work on fixing chunked transfer encoding 2025-04-04 18:32:57 -07:00
22 changed files with 4448 additions and 3852 deletions

View File

@@ -112,9 +112,9 @@ public:
* This check also serves to limit writing the header only once. */
if ((httpResponseData->state & HttpResponseData<SSL>::HTTP_CONNECTION_CLOSE) == 0) {
writeHeader("Connection", "close");
httpResponseData->state |= HttpResponseData<SSL>::HTTP_CONNECTION_CLOSE;
Super::write("\r\n", 2);
}
httpResponseData->state |= HttpResponseData<SSL>::HTTP_CONNECTION_CLOSE;
}
/* if write was called and there was previously no Content-Length header set */
@@ -122,13 +122,11 @@ public:
/* We do not have tryWrite-like functionalities, so ignore optional in this path */
/* Write the chunked data if there is any (this will not send zero chunks) */
this->write(data, nullptr);
/* Terminating 0 chunk */
Super::write("\r\n0\r\n\r\n", 7);
Super::write("0\r\n\r\n", 5);
httpResponseData->markDone();
@@ -453,6 +451,7 @@ public:
writeMark();
writeHeader("Transfer-Encoding", "chunked");
Super::write("\r\n", 2);
httpResponseData->state |= HttpResponseData<SSL>::HTTP_WRITE_CALLED;
}
@@ -462,6 +461,30 @@ public:
return internalEnd({nullptr, 0}, 0, false, false, closeConnection);
}
bool startBody() {
writeStatus(HTTP_200_OK);
HttpResponseData<SSL> *httpResponseData = getHttpResponseData();
if (!(httpResponseData->state & HttpResponseData<SSL>::HTTP_WROTE_CONTENT_LENGTH_HEADER) && !httpResponseData->fromAncientRequest) {
if (!(httpResponseData->state & HttpResponseData<SSL>::HTTP_WRITE_CALLED)) {
writeMark();
writeHeader("Transfer-Encoding", "chunked");
}
auto [written, failed] = Super::write("\r\n", 2);
httpResponseData->state |= HttpResponseData<SSL>::HTTP_WRITE_CALLED;
return !failed;
} else if (!(httpResponseData->state & HttpResponseData<SSL>::HTTP_WRITE_CALLED)) {
writeMark();
auto [written, failed] = Super::write("\r\n", 2);
httpResponseData->state |= HttpResponseData<SSL>::HTTP_WRITE_CALLED;
return !failed;
}
return true;
}
/* Write parts of the response in chunking fashion. Starts timeout if failed. */
bool write(std::string_view data, size_t *writtenPtr = nullptr) {
writeStatus(HTTP_200_OK);
@@ -507,26 +530,27 @@ public:
}
return !has_failed;
}
HttpResponseData<SSL> *httpResponseData = getHttpResponseData();
if (!(httpResponseData->state & HttpResponseData<SSL>::HTTP_WROTE_CONTENT_LENGTH_HEADER) && !httpResponseData->fromAncientRequest) {
const bool is_chunked = !(httpResponseData->state & HttpResponseData<SSL>::HTTP_WROTE_CONTENT_LENGTH_HEADER) && !httpResponseData->fromAncientRequest;
if (is_chunked) {
if (!(httpResponseData->state & HttpResponseData<SSL>::HTTP_WRITE_CALLED)) {
/* Write mark on first call to write */
writeMark();
writeHeader("Transfer-Encoding", "chunked");
httpResponseData->state |= HttpResponseData<SSL>::HTTP_WRITE_CALLED;
Super::write("\r\n", 2);
}
Super::write("\r\n", 2);
writeUnsignedHex((unsigned int) data.length());
Super::write("\r\n", 2);
} else if (!(httpResponseData->state & HttpResponseData<SSL>::HTTP_WRITE_CALLED)) {
writeMark();
Super::write("\r\n", 2);
httpResponseData->state |= HttpResponseData<SSL>::HTTP_WRITE_CALLED;
Super::write("\r\n", 2);
}
size_t total_written = 0;
bool has_failed = false;
@@ -539,7 +563,7 @@ public:
has_failed = has_failed || failed;
total_written += written;
length -= INT_MAX;
data = data.substr(INT_MAX);
data.remove_prefix(INT_MAX);
}
// Handle the remaining data (less than INT_MAX bytes)
if (length > 0) {
@@ -548,14 +572,18 @@ public:
has_failed = has_failed || failed;
total_written += written;
}
/* Reset timeout on each sended chunk */
/* Reset timeout on each sent chunk */
this->resetTimeout();
if (writtenPtr) {
*writtenPtr = total_written;
}
if (is_chunked) {
Super::write("\r\n", 2);
}
/* If we did not fail the write, accept more */
return !has_failed;
}

View File

@@ -107,6 +107,10 @@ export default [
fn: "write",
length: 3,
},
startBody: {
fn: "startBody",
length: 0,
},
end: {
fn: "end",
length: 2,

View File

@@ -504,6 +504,20 @@ pub fn writeContinue(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject,
return .undefined;
}
pub fn startBody(this: *NodeHTTPResponse, globalObject: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue {
_ = callframe;
if (this.isDone()) {
return .undefined;
}
const state = this.raw_response.state();
try handleEndedIfNecessary(state, globalObject);
_ = this.raw_response.startBody();
return .undefined;
}
pub const AbortEvent = enum(u8) {
none = 0,
abort = 1,

View File

@@ -1313,6 +1313,7 @@ JSC_DEFINE_HOST_FUNCTION(jsFunctionRequireNativeModule, (JSGlobalObject * lexica
if (res.success)
return JSC::JSValue::encode(result);
}
RETURN_IF_EXCEPTION(throwScope, {});
ASSERT_WITH_MESSAGE(false, "Failed to fetch builtin module %s", specifier.utf8().data());
return throwVMError(globalObject, throwScope, "Failed to fetch builtin module"_s);
}

View File

@@ -1244,13 +1244,15 @@ JSC_DEFINE_HOST_FUNCTION(jsHTTPSetTimeout, (JSGlobalObject * globalObject, CallF
if (auto* jsRequest = jsDynamicCast<WebCore::JSRequest*>(requestValue)) {
Request__setTimeout(jsRequest->wrapped(), JSValue::encode(seconds), globalObject);
return JSValue::encode(jsBoolean(true));
}
if (auto* nodeHttpResponse = jsDynamicCast<WebCore::JSNodeHTTPResponse*>(requestValue)) {
NodeHTTPResponse__setTimeout(nodeHttpResponse->wrapped(), JSValue::encode(seconds), globalObject);
return JSValue::encode(jsBoolean(true));
}
return JSValue::encode(jsUndefined());
return JSValue::encode(jsBoolean(false));
}
JSC_DEFINE_HOST_FUNCTION(jsHTTPSetServerIdleTimeout, (JSGlobalObject * globalObject, CallFrame* callFrame))
{

View File

@@ -1312,6 +1312,18 @@ extern "C"
return uwsRes->getWriteOffset();
}
bool uws_res_start_body(int ssl, uws_res_r res) nonnull_fn_decl;
bool uws_res_start_body(int ssl, uws_res_r res)
{
if (ssl)
{
uWS::HttpResponse<true> *uwsRes = (uWS::HttpResponse<true> *)res;
return uwsRes->startBody();
}
uWS::HttpResponse<false> *uwsRes = (uWS::HttpResponse<false> *)res;
return uwsRes->startBody();
}
bool uws_res_has_responded(int ssl, uws_res_r res) nonnull_fn_decl;
bool uws_res_has_responded(int ssl, uws_res_r res)
{

View File

@@ -3115,6 +3115,18 @@ pub const AnyResponse = union(enum) {
};
}
pub fn startBody(this: AnyResponse) bool {
return switch (this) {
inline else => |resp| resp.startBody(),
};
}
pub fn writeRaw(this: AnyResponse, data: []const u8) void {
switch (this) {
inline else => |resp| resp.writeRaw(data),
}
}
pub fn end(this: AnyResponse, data: []const u8, close_connection: bool) void {
return switch (this) {
inline else => |resp| resp.end(data, close_connection),
@@ -3618,6 +3630,9 @@ pub fn NewApp(comptime ssl: bool) type {
pub fn getBufferedAmount(res: *Response) u64 {
return uws_res_get_buffered_amount(ssl_flag, res.downcast());
}
pub fn startBody(res: *Response) bool {
return uws_res_start_body(ssl_flag, res.downcast());
}
pub fn write(res: *Response, data: []const u8) WriteResult {
var len: usize = data.len;
return switch (uws_res_write(ssl_flag, res.downcast(), data.ptr, &len)) {
@@ -3991,6 +4006,7 @@ extern fn uws_res_get_buffered_amount(ssl: i32, res: *uws_res) u64;
extern fn uws_res_write(ssl: i32, res: *uws_res, data: ?[*]const u8, length: *usize) bool;
extern fn uws_res_get_write_offset(ssl: i32, res: *uws_res) u64;
extern fn uws_res_override_write_offset(ssl: i32, res: *uws_res, u64) void;
extern fn uws_res_start_body(ssl: i32, res: *uws_res) bool;
extern fn uws_res_has_responded(ssl: i32, res: *uws_res) bool;
extern fn uws_res_on_writable(ssl: i32, res: *uws_res, handler: ?*const fn (*uws_res, u64, ?*anyopaque) callconv(.C) bool, user_data: ?*anyopaque) void;
extern fn uws_res_clear_on_writable(ssl: i32, res: *uws_res) void;

View File

@@ -1,3 +1,440 @@
const kDeprecatedReplySymbol = Symbol("deprecatedReply");
const { checkIsHttpToken } = require("internal/validators");
const { isTypedArray, isArrayBuffer } = require("node:util/types");
export default { kDeprecatedReplySymbol };
const kDeprecatedReplySymbol = Symbol("deprecatedReply");
const kBodyChunks = Symbol("bodyChunks");
const kPath = Symbol("path");
const kPort = Symbol("port");
const kMethod = Symbol("method");
const kHost = Symbol("host");
const kProtocol = Symbol("protocol");
const kAgent = Symbol("agent");
const kFetchRequest = Symbol("fetchRequest");
const kTls = Symbol("tls");
const kUseDefaultPort = Symbol("useDefaultPort");
const kRes = Symbol("res");
const kUpgradeOrConnect = Symbol("upgradeOrConnect");
const kParser = Symbol("parser");
const kMaxHeadersCount = Symbol("maxHeadersCount");
const kReusedSocket = Symbol("reusedSocket");
const kTimeoutTimer = Symbol("timeoutTimer");
const kOptions = Symbol("options");
const kSocketPath = Symbol("socketPath");
const kSignal = Symbol("signal");
const kMaxHeaderSize = Symbol("maxHeaderSize");
const kJoinDuplicateHeaders = Symbol("joinDuplicateHeaders");
const abortedSymbol = Symbol("aborted");
const kClearTimeout = Symbol("kClearTimeout");
const headerStateSymbol = Symbol("headerState");
// used for pretending to emit events in the right order
const kEmitState = Symbol("emitState");
const bodyStreamSymbol = Symbol("bodyStream");
const controllerSymbol = Symbol("controller");
const runSymbol = Symbol("run");
const deferredSymbol = Symbol("deferred");
const eofInProgress = Symbol("eofInProgress");
const fakeSocketSymbol = Symbol("fakeSocket");
const firstWriteSymbol = Symbol("firstWrite");
const headersSymbol = Symbol("headers");
const isTlsSymbol = Symbol("is_tls");
const kHandle = Symbol("handle");
const kRealListen = Symbol("kRealListen");
const noBodySymbol = Symbol("noBody");
const optionsSymbol = Symbol("options");
const reqSymbol = Symbol("req");
const timeoutTimerSymbol = Symbol("timeoutTimer");
const tlsSymbol = Symbol("tls");
const typeSymbol = Symbol("type");
const webRequestOrResponse = Symbol("FetchAPI");
const statusCodeSymbol = Symbol("statusCode");
const kAbortController = Symbol.for("kAbortController");
const statusMessageSymbol = Symbol("statusMessage");
const kInternalSocketData = Symbol.for("::bunternal::");
const serverSymbol = Symbol.for("::bunternal::");
const kPendingCallbacks = Symbol("pendingCallbacks");
const kRequest = Symbol("request");
const kCloseCallback = Symbol("closeCallback");
const kDeferredTimeouts = Symbol("deferredTimeouts");
const RegExpPrototypeExec = RegExp.prototype.exec;
const kEmptyObject = Object.freeze(Object.create(null));
export const enum ClientRequestEmitState {
socket = 1,
prefinish = 2,
finish = 3,
response = 4,
}
export const enum NodeHTTPResponseAbortEvent {
none = 0,
abort = 1,
timeout = 2,
}
export const enum NodeHTTPIncomingRequestType {
FetchRequest,
FetchResponse,
NodeHTTPResponse,
}
export const enum NodeHTTPBodyReadState {
none,
pending = 1 << 1,
done = 1 << 2,
hasBufferedDataDuringPause = 1 << 3,
}
// Must be kept in sync with NodeHTTPResponse.Flags
export const enum NodeHTTPResponseFlags {
socket_closed = 1 << 0,
request_has_completed = 1 << 1,
closed_or_completed = socket_closed | request_has_completed,
}
export const enum NodeHTTPHeaderState {
none,
assigned,
sent,
}
function emitErrorNextTickIfErrorListenerNT(self, err, cb) {
process.nextTick(emitErrorNextTickIfErrorListener, self, err, cb);
}
function emitErrorNextTickIfErrorListener(self, err, cb) {
if ($isCallable(cb)) {
// This is to keep backward compatible behavior.
// An error is emitted only if there are listeners attached to the event.
if (self.listenerCount("error") == 0) {
cb();
} else {
cb(err);
}
}
}
const headerCharRegex = /[^\t\x20-\x7e\x80-\xff]/;
/**
* True if val contains an invalid field-vchar
* field-value = *( field-content / obs-fold )
* field-content = field-vchar [ 1*( SP / HTAB ) field-vchar ]
* field-vchar = VCHAR / obs-text
*/
function checkInvalidHeaderChar(val: string) {
return RegExpPrototypeExec.$call(headerCharRegex, val) !== null;
}
const validateHeaderName = (name, label?) => {
if (typeof name !== "string" || !name || !checkIsHttpToken(name)) {
throw $ERR_INVALID_HTTP_TOKEN(label || "Header name", name);
}
};
const validateHeaderValue = (name, value) => {
if (value === undefined) {
throw $ERR_HTTP_INVALID_HEADER_VALUE(value, name);
}
if (checkInvalidHeaderChar(value)) {
throw $ERR_INVALID_CHAR("header content", name);
}
};
// TODO: make this more robust.
function isAbortError(err) {
return err?.name === "AbortError";
}
// This lets us skip some URL parsing
let isNextIncomingMessageHTTPS = false;
function getIsNextIncomingMessageHTTPS() {
return isNextIncomingMessageHTTPS;
}
function setIsNextIncomingMessageHTTPS(value) {
isNextIncomingMessageHTTPS = value;
}
function callCloseCallback(self) {
if (self[kCloseCallback]) {
self[kCloseCallback]();
self[kCloseCallback] = undefined;
}
}
function emitCloseNT(self) {
if (!self._closed) {
self.destroyed = true;
self._closed = true;
callCloseCallback(self);
self.emit("close");
}
}
function emitCloseNTAndComplete(self) {
if (!self._closed) {
self._closed = true;
callCloseCallback(self);
self.emit("close");
}
self.complete = true;
}
function emitEOFIncomingMessageOuter(self) {
self.push(null);
self.complete = true;
}
function emitEOFIncomingMessage(self) {
self[eofInProgress] = true;
process.nextTick(emitEOFIncomingMessageOuter, self);
}
function validateMsecs(numberlike: any, field: string) {
if (typeof numberlike !== "number" || numberlike < 0) {
throw $ERR_INVALID_ARG_TYPE(field, "number", numberlike);
}
return numberlike;
}
function isValidTLSArray(obj) {
if (typeof obj === "string" || isTypedArray(obj) || isArrayBuffer(obj) || $inheritsBlob(obj)) return true;
if (Array.isArray(obj)) {
for (var i = 0; i < obj.length; i++) {
const item = obj[i];
if (typeof item !== "string" && !isTypedArray(item) && !isArrayBuffer(item) && !$inheritsBlob(item)) return false; // prettier-ignore
}
return true;
}
return false;
}
class ConnResetException extends Error {
constructor(msg) {
super(msg);
this.code = "ECONNRESET";
this.name = "ConnResetException";
}
}
const METHODS = [
"ACL",
"BIND",
"CHECKOUT",
"CONNECT",
"COPY",
"DELETE",
"GET",
"HEAD",
"LINK",
"LOCK",
"M-SEARCH",
"MERGE",
"MKACTIVITY",
"MKCALENDAR",
"MKCOL",
"MOVE",
"NOTIFY",
"OPTIONS",
"PATCH",
"POST",
"PROPFIND",
"PROPPATCH",
"PURGE",
"PUT",
"QUERY",
"REBIND",
"REPORT",
"SEARCH",
"SOURCE",
"SUBSCRIBE",
"TRACE",
"UNBIND",
"UNLINK",
"UNLOCK",
"UNSUBSCRIBE",
];
const STATUS_CODES = {
100: "Continue",
101: "Switching Protocols",
102: "Processing",
103: "Early Hints",
200: "OK",
201: "Created",
202: "Accepted",
203: "Non-Authoritative Information",
204: "No Content",
205: "Reset Content",
206: "Partial Content",
207: "Multi-Status",
208: "Already Reported",
226: "IM Used",
300: "Multiple Choices",
301: "Moved Permanently",
302: "Found",
303: "See Other",
304: "Not Modified",
305: "Use Proxy",
307: "Temporary Redirect",
308: "Permanent Redirect",
400: "Bad Request",
401: "Unauthorized",
402: "Payment Required",
403: "Forbidden",
404: "Not Found",
405: "Method Not Allowed",
406: "Not Acceptable",
407: "Proxy Authentication Required",
408: "Request Timeout",
409: "Conflict",
410: "Gone",
411: "Length Required",
412: "Precondition Failed",
413: "Payload Too Large",
414: "URI Too Long",
415: "Unsupported Media Type",
416: "Range Not Satisfiable",
417: "Expectation Failed",
418: "I'm a Teapot",
421: "Misdirected Request",
422: "Unprocessable Entity",
423: "Locked",
424: "Failed Dependency",
425: "Too Early",
426: "Upgrade Required",
428: "Precondition Required",
429: "Too Many Requests",
431: "Request Header Fields Too Large",
451: "Unavailable For Legal Reasons",
500: "Internal Server Error",
501: "Not Implemented",
502: "Bad Gateway",
503: "Service Unavailable",
504: "Gateway Timeout",
505: "HTTP Version Not Supported",
506: "Variant Also Negotiates",
507: "Insufficient Storage",
508: "Loop Detected",
509: "Bandwidth Limit Exceeded",
510: "Not Extended",
511: "Network Authentication Required",
};
function hasServerResponseFinished(self, chunk, callback) {
const finished = self.finished;
if (chunk) {
const destroyed = self.destroyed;
if (finished || destroyed) {
let err;
if (finished) {
err = $ERR_STREAM_WRITE_AFTER_END();
} else if (destroyed) {
err = $ERR_STREAM_DESTROYED("Stream is destroyed");
}
if (!destroyed) {
process.nextTick(emitErrorNt, self, err, callback);
} else if ($isCallable(callback)) {
process.nextTick(callback, err);
}
return true;
}
} else if (finished) {
if ($isCallable(callback)) {
if (!self.writableFinished) {
self.on("finish", callback);
} else {
callback($ERR_STREAM_ALREADY_FINISHED("end"));
}
}
return true;
}
return false;
}
function emitErrorNt(msg, err, callback) {
if ($isCallable(callback)) {
callback(err);
}
if ($isCallable(msg.emit) && !msg.destroyed) {
msg.emit("error", err);
}
}
export {
kDeprecatedReplySymbol,
kBodyChunks,
kPath,
kPort,
kMethod,
kHost,
kProtocol,
kAgent,
kFetchRequest,
kTls,
kUseDefaultPort,
kRes,
kUpgradeOrConnect,
kParser,
kMaxHeadersCount,
kReusedSocket,
kTimeoutTimer,
kOptions,
kSocketPath,
kSignal,
kMaxHeaderSize,
kJoinDuplicateHeaders,
abortedSymbol,
kClearTimeout,
emitErrorNextTickIfErrorListenerNT,
headerStateSymbol,
kEmitState,
bodyStreamSymbol,
controllerSymbol,
runSymbol,
deferredSymbol,
eofInProgress,
fakeSocketSymbol,
firstWriteSymbol,
headersSymbol,
isTlsSymbol,
kHandle,
kRealListen,
noBodySymbol,
optionsSymbol,
reqSymbol,
timeoutTimerSymbol,
tlsSymbol,
typeSymbol,
webRequestOrResponse,
statusCodeSymbol,
kAbortController,
statusMessageSymbol,
kInternalSocketData,
serverSymbol,
kPendingCallbacks,
kRequest,
kCloseCallback,
kDeferredTimeouts,
validateHeaderName,
validateHeaderValue,
isAbortError,
kEmptyObject,
getIsNextIncomingMessageHTTPS,
setIsNextIncomingMessageHTTPS,
callCloseCallback,
emitCloseNT,
emitCloseNTAndComplete,
emitEOFIncomingMessage,
validateMsecs,
isValidTLSArray,
ConnResetException,
METHODS,
STATUS_CODES,
hasServerResponseFinished,
};

View File

@@ -9,6 +9,9 @@ function urlToHttpOptions(url) {
path: `${url.pathname || ""}${url.search || ""}`,
href: url.href,
};
if (url.headers) {
options.headers = url.headers;
}
if (url.port !== "") {
options.port = Number(url.port);
}

144
src/js/node/_http_agent.ts Normal file
View File

@@ -0,0 +1,144 @@
const EventEmitter: typeof import("node:events").EventEmitter = require("node:events");
const { kEmptyObject } = require("internal/http");
const { FakeSocket } = require("node:_http_outgoing");
const ObjectDefineProperty = Object.defineProperty;
const kfakeSocket = Symbol("kfakeSocket");
const NODE_HTTP_WARNING =
"WARN: Agent is mostly unused in Bun's implementation of http. If you see strange behavior, this is probably the cause.";
// Define Agent interface
interface Agent extends InstanceType<typeof EventEmitter> {
defaultPort: number;
protocol: string;
options: any;
requests: Record<string, any>;
sockets: Record<string, any>;
freeSockets: Record<string, any>;
keepAliveMsecs: number;
keepAlive: boolean;
maxSockets: number;
maxFreeSockets: number;
scheduling: string;
maxTotalSockets: any;
totalSocketCount: number;
[kfakeSocket]?: any;
createConnection(): any;
getName(options?: any): string;
addRequest(): void;
createSocket(req: any, options: any, cb: (err: any, socket: any) => void): void;
removeSocket(): void;
keepSocketAlive(): boolean;
reuseSocket(): void;
destroy(): void;
}
// Define the constructor interface
interface AgentConstructor {
new (options?: any): Agent;
(options?: any): Agent;
defaultMaxSockets: number;
globalAgent: Agent;
prototype: Agent;
}
function Agent(options = kEmptyObject) {
if (!(this instanceof Agent)) return new Agent(options);
EventEmitter.$apply(this, []);
this.defaultPort = 80;
this.protocol = "http:";
this.options = options = { ...options, path: null };
if (options.noDelay === undefined) options.noDelay = true;
// Don't confuse net and make it think that we're connecting to a pipe
this.requests = Object.create(null);
this.sockets = Object.create(null);
this.freeSockets = Object.create(null);
this.keepAliveMsecs = options.keepAliveMsecs || 1000;
this.keepAlive = options.keepAlive || false;
this.maxSockets = options.maxSockets || Agent.defaultMaxSockets;
this.maxFreeSockets = options.maxFreeSockets || 256;
this.scheduling = options.scheduling || "lifo";
this.maxTotalSockets = options.maxTotalSockets;
this.totalSocketCount = 0;
this.defaultPort = options.defaultPort || 80;
this.protocol = options.protocol || "http:";
}
$toClass(Agent, "Agent", EventEmitter);
// Type assertion to help TypeScript understand Agent has static properties
const AgentClass = Agent as unknown as AgentConstructor;
ObjectDefineProperty(AgentClass, "globalAgent", {
get: function () {
return globalAgent;
},
});
ObjectDefineProperty(AgentClass, "defaultMaxSockets", {
get: function () {
return Infinity;
},
});
Agent.prototype.createConnection = function () {
$debug(`${NODE_HTTP_WARNING}\n`, "WARN: Agent.createConnection is a no-op, returns fake socket");
return (this[kfakeSocket] ??= new FakeSocket());
};
Agent.prototype.getName = function (options = kEmptyObject) {
let name = `http:${options.host || "localhost"}:`;
if (options.port) name += options.port;
name += ":";
if (options.localAddress) name += options.localAddress;
// Pacify parallel/test-http-agent-getname by only appending
// the ':' when options.family is set.
if (options.family === 4 || options.family === 6) name += `:${options.family}`;
if (options.socketPath) name += `:${options.socketPath}`;
return name;
};
Agent.prototype.addRequest = function () {
$debug(`${NODE_HTTP_WARNING}\n`, "WARN: Agent.addRequest is a no-op");
};
Agent.prototype.createSocket = function (req, options, cb) {
$debug(`${NODE_HTTP_WARNING}\n`, "WARN: Agent.createSocket returns fake socket");
cb(null, (this[kfakeSocket] ??= new FakeSocket()));
};
Agent.prototype.removeSocket = function () {
$debug(`${NODE_HTTP_WARNING}\n`, "WARN: Agent.removeSocket is a no-op");
};
Agent.prototype.keepSocketAlive = function () {
$debug(`${NODE_HTTP_WARNING}\n`, "WARN: Agent.keepSocketAlive is a no-op");
return true;
};
Agent.prototype.reuseSocket = function () {
$debug(`${NODE_HTTP_WARNING}\n`, "WARN: Agent.reuseSocket is a no-op");
};
Agent.prototype.destroy = function () {
$debug(`${NODE_HTTP_WARNING}\n`, "WARN: Agent.destroy is a no-op");
};
var globalAgent = new Agent();
const http_agent_exports = {
Agent: AgentClass,
globalAgent,
NODE_HTTP_WARNING,
};
export default http_agent_exports;

970
src/js/node/_http_client.ts Normal file
View File

@@ -0,0 +1,970 @@
const { isIP, isIPv6 } = require("node:net");
const { checkIsHttpToken, validateFunction } = require("internal/validators");
const { urlToHttpOptions } = require("internal/url");
const {
kBodyChunks,
abortedSymbol,
kClearTimeout,
emitErrorNextTickIfErrorListenerNT,
isAbortError,
kTls,
kAbortController,
kMethod,
kAgent,
kProtocol,
kPath,
kUseDefaultPort,
kHost,
kPort,
kSocketPath,
kFetchRequest,
kRes,
kUpgradeOrConnect,
kParser,
kMaxHeaderSize,
kMaxHeadersCount,
kReusedSocket,
kOptions,
kJoinDuplicateHeaders,
kTimeoutTimer,
kEmitState,
ClientRequestEmitState,
kSignal,
kEmptyObject,
getIsNextIncomingMessageHTTPS,
setIsNextIncomingMessageHTTPS,
typeSymbol,
NodeHTTPIncomingRequestType,
reqSymbol,
callCloseCallback,
emitCloseNTAndComplete,
validateMsecs,
isValidTLSArray,
ConnResetException,
} = require("internal/http");
const { Agent, NODE_HTTP_WARNING } = require("node:_http_agent");
const { IncomingMessage } = require("node:_http_incoming");
const { OutgoingMessage } = require("node:_http_outgoing");
const globalReportError = globalThis.reportError;
const setTimeout = globalThis.setTimeout;
const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/;
const fetch = Bun.fetch;
const { URL } = globalThis;
// Primordials
const ObjectAssign = Object.assign;
const RegExpPrototypeExec = RegExp.prototype.exec;
const StringPrototypeToUpperCase = String.prototype.toUpperCase;
function ClientRequest(input, options, cb) {
if (!(this instanceof ClientRequest)) {
return new (ClientRequest as any)(input, options, cb);
}
this.write = (chunk, encoding, callback) => {
if (this.destroyed) return false;
if ($isCallable(chunk)) {
callback = chunk;
chunk = undefined;
encoding = undefined;
} else if ($isCallable(encoding)) {
callback = encoding;
encoding = undefined;
} else if (!$isCallable(callback)) {
callback = undefined;
}
return write_(chunk, encoding, callback);
};
let writeCount = 0;
let resolveNextChunk: ((end: boolean) => void) | undefined = end => {};
const pushChunk = chunk => {
this[kBodyChunks].push(chunk);
if (writeCount > 1) {
startFetch();
}
resolveNextChunk?.(false);
};
const write_ = (chunk, encoding, callback) => {
const MAX_FAKE_BACKPRESSURE_SIZE = 1024 * 1024;
const canSkipReEncodingData =
// UTF-8 string:
(typeof chunk === "string" && (encoding === "utf-8" || encoding === "utf8" || !encoding)) ||
// Buffer
($isTypedArrayView(chunk) && (!encoding || encoding === "buffer" || encoding === "utf-8"));
let bodySize = 0;
if (!canSkipReEncodingData) {
chunk = Buffer.from(chunk, encoding);
}
bodySize = chunk.length;
writeCount++;
if (!this[kBodyChunks]) {
this[kBodyChunks] = [];
pushChunk(chunk);
if (callback) callback();
return true;
}
// Signal fake backpressure if the body size is > 1024 * 1024
// So that code which loops forever until backpressure is signaled
// will eventually exit.
for (let chunk of this[kBodyChunks]) {
bodySize += chunk.length;
if (bodySize >= MAX_FAKE_BACKPRESSURE_SIZE) {
break;
}
}
pushChunk(chunk);
if (callback) callback();
return bodySize < MAX_FAKE_BACKPRESSURE_SIZE;
};
const oldEnd = this.end;
this.end = function (chunk, encoding, callback) {
oldEnd?.$call(this, chunk, encoding, callback);
if ($isCallable(chunk)) {
callback = chunk;
chunk = undefined;
encoding = undefined;
} else if ($isCallable(encoding)) {
callback = encoding;
encoding = undefined;
} else if (!$isCallable(callback)) {
callback = undefined;
}
if (chunk) {
if (this.finished) {
emitErrorNextTickIfErrorListenerNT(this, $ERR_STREAM_WRITE_AFTER_END(), callback);
return this;
}
write_(chunk, encoding, null);
} else if (this.finished) {
if (callback) {
if (!this.writableFinished) {
this.on("finish", callback);
} else {
callback($ERR_STREAM_ALREADY_FINISHED("end"));
}
}
}
if (callback) {
this.once("finish", callback);
}
if (!this.finished) {
send();
resolveNextChunk?.(true);
}
return this;
};
this.destroy = function (err?: Error) {
if (this.destroyed) return this;
this.destroyed = true;
const res = this.res;
// If we're aborting, we don't care about any more response data.
if (res) {
res._dump();
}
this.finished = true;
if (this.res && !this.res.complete) {
this.res.emit("end");
}
// If request is destroyed we abort the current response
this[kAbortController]?.abort?.();
this.socket.destroy(err);
return this;
};
this._ensureTls = () => {
if (this[kTls] === null) this[kTls] = {};
return this[kTls];
};
const socketCloseListener = () => {
this.destroyed = true;
const res = this.res;
if (res) {
// Socket closed before we emitted 'end' below.
if (!res.complete) {
res.destroy(new ConnResetException("aborted"));
}
if (!this._closed) {
this._closed = true;
callCloseCallback(this);
this.emit("close");
}
if (!res.aborted && res.readable) {
res.push(null);
}
} else if (!this._closed) {
this._closed = true;
callCloseCallback(this);
this.emit("close");
}
};
const onAbort = (err?: Error) => {
this[kClearTimeout]?.();
socketCloseListener();
if (!this[abortedSymbol]) {
process.nextTick(emitAbortNextTick, this);
this[abortedSymbol] = true;
}
};
let fetching = false;
const startFetch = (customBody?) => {
if (fetching) {
return false;
}
fetching = true;
const method = this[kMethod];
let keepalive = true;
const agentKeepalive = this[kAgent]?.keepalive;
if (agentKeepalive !== undefined) {
keepalive = agentKeepalive;
}
const protocol = this[kProtocol];
const path = this[kPath];
let host = this[kHost];
const getURL = host => {
if (isIPv6(host)) {
host = `[${host}]`;
}
if (path.startsWith("http://") || path.startsWith("https://")) {
return [path, `${protocol}//${host}${this[kUseDefaultPort] ? "" : ":" + this[kPort]}`];
} else {
let proxy: string | undefined;
const url = `${protocol}//${host}${this[kUseDefaultPort] ? "" : ":" + this[kPort]}${path}`;
// support agent proxy url/string for http/https
try {
// getters can throw
const agentProxy = this[kAgent]?.proxy;
// this should work for URL like objects and strings
proxy = agentProxy?.href || agentProxy;
} catch {}
return [url, proxy];
}
};
const go = (url, proxy, softFail = false) => {
const tls =
protocol === "https:" && this[kTls] ? { ...this[kTls], serverName: this[kTls].servername } : undefined;
const fetchOptions: any = {
method,
headers: this.getHeaders(),
redirect: "manual",
signal: this[kAbortController]?.signal,
// Timeouts are handled via this.setTimeout.
timeout: false,
// Disable auto gzip/deflate
decompress: false,
keepalive,
};
let keepOpen = false;
// no body and not finished
const isDuplex = customBody === undefined && !this.finished;
if (isDuplex) {
fetchOptions.duplex = "half";
keepOpen = true;
}
if (method !== "GET" && method !== "HEAD" && method !== "OPTIONS") {
const self = this;
if (customBody !== undefined) {
fetchOptions.body = customBody;
} else if (isDuplex) {
fetchOptions.body = async function* () {
while (self[kBodyChunks]?.length > 0) {
yield self[kBodyChunks].shift();
}
if (self[kBodyChunks]?.length === 0) {
self.emit("drain");
}
while (!self.finished) {
yield await new Promise(resolve => {
resolveNextChunk = end => {
resolveNextChunk = undefined;
if (end) {
resolve(undefined);
} else {
resolve(self[kBodyChunks].shift());
}
};
});
if (self[kBodyChunks]?.length === 0) {
self.emit("drain");
}
}
handleResponse?.();
};
}
}
if (tls) {
fetchOptions.tls = tls;
}
if (!!$debug) {
fetchOptions.verbose = true;
}
if (proxy) {
fetchOptions.proxy = proxy;
}
const socketPath = this[kSocketPath];
if (socketPath) {
fetchOptions.unix = socketPath;
}
//@ts-ignore
this[kFetchRequest] = fetch(url, fetchOptions).then(response => {
if (this.aborted) {
maybeEmitClose();
return;
}
handleResponse = () => {
this[kFetchRequest] = null;
this[kClearTimeout]();
handleResponse = undefined;
const prevIsHTTPS = getIsNextIncomingMessageHTTPS();
setIsNextIncomingMessageHTTPS(response.url.startsWith("https:"));
var res = (this.res = new IncomingMessage(response, {
[typeSymbol]: NodeHTTPIncomingRequestType.FetchResponse,
[reqSymbol]: this,
}));
setIsNextIncomingMessageHTTPS(prevIsHTTPS);
res.req = this;
let timer;
response.setTimeout = (msecs, callback) => {
if (timer) {
clearTimeout(timer);
}
timer = setTimeout(() => {
if (res.complete) {
return;
}
res.emit("timeout");
callback?.();
}, msecs);
};
process.nextTick(
(self, res) => {
// If the user did not listen for the 'response' event, then they
// can't possibly read the data, so we ._dump() it into the void
// so that the socket doesn't hang there in a paused state.
if (self.aborted || !self.emit("response", res)) {
res._dump();
}
},
this,
res,
);
maybeEmitClose();
if (res.statusCode === 304) {
res.complete = true;
maybeEmitClose();
return;
}
};
if (!keepOpen) {
handleResponse();
}
onEnd();
});
if (!softFail) {
// Don't emit an error if we're iterating over multiple possible addresses and we haven't reached the end yet.
// This is for the happy eyeballs implementation.
this[kFetchRequest]
.catch(err => {
// Node treats AbortError separately.
// The "abort" listener on the abort controller should have called this
if (isAbortError(err)) {
return;
}
if (!!$debug) globalReportError(err);
try {
this.emit("error", err);
} catch (e) {}
})
.finally(() => {
if (!keepOpen) {
fetching = false;
this[kFetchRequest] = null;
this[kClearTimeout]();
}
});
}
return this[kFetchRequest];
};
if (isIP(host) || !options.lookup) {
// Don't need to bother with lookup if it's already an IP address or no lookup function is provided.
const [url, proxy] = getURL(host);
go(url, proxy, false);
return true;
}
try {
options.lookup(host, { all: true }, (err, results) => {
if (err) {
if (!!$debug) globalReportError(err);
process.nextTick((self, err) => self.emit("error", err), this, err);
return;
}
let candidates = results.sort((a, b) => b.family - a.family); // prefer IPv6
const fail = (message, name, code, syscall) => {
const error = new Error(message);
error.name = name;
error.code = code;
error.syscall = syscall;
if (!!$debug) globalReportError(error);
process.nextTick((self, err) => self.emit("error", err), this, error);
};
if (candidates.length === 0) {
fail("No records found", "DNSException", "ENOTFOUND", "getaddrinfo");
return;
}
if (!this.hasHeader("Host")) {
this.setHeader("Host", `${host}:${port}`);
}
// We want to try all possible addresses, beginning with the IPv6 ones, until one succeeds.
// All addresses except for the last are allowed to "soft fail" -- instead of reporting
// an error to the user, we'll just skip to the next address.
// The last address is required to work, and if it fails we'll throw an error.
const iterate = () => {
if (candidates.length === 0) {
// If we get to this point, it means that none of the addresses could be connected to.
fail(`connect ECONNREFUSED ${host}:${port}`, "Error", "ECONNREFUSED", "connect");
return;
}
const [url, proxy] = getURL(candidates.shift().address);
go(url, proxy, candidates.length > 0).catch(iterate);
};
iterate();
});
return true;
} catch (err) {
if (!!$debug) globalReportError(err);
process.nextTick((self, err) => self.emit("error", err), this, err);
return false;
}
};
let onEnd = () => {};
let handleResponse: (() => void) | undefined = () => {};
const send = () => {
this.finished = true;
this[kAbortController] = new AbortController();
this[kAbortController].signal.addEventListener("abort", onAbort, { once: true });
var body = this[kBodyChunks] && this[kBodyChunks].length > 1 ? new Blob(this[kBodyChunks]) : this[kBodyChunks]?.[0];
try {
startFetch(body);
onEnd = () => {
handleResponse?.();
};
} catch (err) {
if (!!$debug) globalReportError(err);
this.emit("error", err);
} finally {
process.nextTick(maybeEmitFinish.bind(this));
}
};
// --- For faking the events in the right order ---
const maybeEmitSocket = () => {
if (!(this[kEmitState] & (1 << ClientRequestEmitState.socket))) {
this[kEmitState] |= 1 << ClientRequestEmitState.socket;
this.emit("socket", this.socket);
}
};
const maybeEmitPrefinish = () => {
maybeEmitSocket();
if (!(this[kEmitState] & (1 << ClientRequestEmitState.prefinish))) {
this[kEmitState] |= 1 << ClientRequestEmitState.prefinish;
this.emit("prefinish");
}
};
const maybeEmitFinish = () => {
maybeEmitPrefinish();
if (!(this[kEmitState] & (1 << ClientRequestEmitState.finish))) {
this[kEmitState] |= 1 << ClientRequestEmitState.finish;
this.emit("finish");
}
};
const maybeEmitClose = () => {
maybeEmitPrefinish();
if (!this._closed) {
process.nextTick(emitCloseNTAndComplete, this);
}
};
this.abort = () => {
if (this.aborted) return;
this[abortedSymbol] = true;
process.nextTick(emitAbortNextTick, this);
this[kAbortController]?.abort?.();
this.destroy();
};
if (typeof input === "string") {
const urlStr = input;
try {
var urlObject = new URL(urlStr);
} catch (e) {
throw $ERR_INVALID_URL(`Invalid URL: ${urlStr}`);
}
input = urlToHttpOptions(urlObject);
} else if (input && typeof input === "object" && input instanceof URL) {
// url.URL instance
input = urlToHttpOptions(input);
} else {
cb = options;
options = input;
input = null;
}
if (typeof options === "function") {
cb = options;
options = input || kEmptyObject;
} else {
options = ObjectAssign(input || {}, options);
}
this[kTls] = null;
this[kAbortController] = null;
let agent = options.agent;
const defaultAgent = options._defaultAgent || Agent.globalAgent;
if (agent === false) {
agent = new defaultAgent.constructor();
} else if (agent == null) {
agent = defaultAgent;
} else if (typeof agent.addRequest !== "function") {
throw $ERR_INVALID_ARG_TYPE("options.agent", "Agent-like Object, undefined, or false", agent);
}
this[kAgent] = agent;
this.destroyed = false;
const protocol = options.protocol || defaultAgent.protocol;
let expectedProtocol = defaultAgent.protocol;
if (this.agent.protocol) {
expectedProtocol = this.agent.protocol;
}
if (protocol !== expectedProtocol) {
throw $ERR_INVALID_PROTOCOL(protocol, expectedProtocol);
}
this[kProtocol] = protocol;
if (options.path) {
const path = String(options.path);
if (RegExpPrototypeExec.$call(INVALID_PATH_REGEX, path) !== null) {
throw $ERR_UNESCAPED_CHARACTERS("Request path");
}
}
const defaultPort = options.defaultPort || this[kAgent].defaultPort;
const port = (this[kPort] = options.port || defaultPort || 80);
this[kUseDefaultPort] = this[kPort] === defaultPort;
const host =
(this[kHost] =
options.host =
validateHost(options.hostname, "hostname") || validateHost(options.host, "host") || "localhost");
const setHost = options.setHost === undefined || Boolean(options.setHost);
this[kSocketPath] = options.socketPath;
const signal = options.signal;
if (signal) {
//We still want to control abort function and timeout so signal call our AbortController
signal.addEventListener(
"abort",
() => {
this[kAbortController]?.abort();
},
{ once: true },
);
this[kSignal] = signal;
}
let method = options.method;
const methodIsString = typeof method === "string";
if (method !== null && method !== undefined && !methodIsString) {
throw $ERR_INVALID_ARG_TYPE("options.method", "string", method);
}
if (methodIsString && method) {
if (!checkIsHttpToken(method)) {
throw $ERR_INVALID_HTTP_TOKEN("Method", method);
}
method = this[kMethod] = StringPrototypeToUpperCase.$call(method);
} else {
method = this[kMethod] = "GET";
}
const _maxHeaderSize = options.maxHeaderSize;
// TODO: Validators
// if (maxHeaderSize !== undefined)
// validateInteger(maxHeaderSize, "maxHeaderSize", 0);
this[kMaxHeaderSize] = _maxHeaderSize;
// const insecureHTTPParser = options.insecureHTTPParser;
// if (insecureHTTPParser !== undefined) {
// validateBoolean(insecureHTTPParser, 'options.insecureHTTPParser');
// }
// this.insecureHTTPParser = insecureHTTPParser;
var _joinDuplicateHeaders = options.joinDuplicateHeaders;
if (_joinDuplicateHeaders !== undefined) {
// TODO: Validators
// validateBoolean(
// options.joinDuplicateHeaders,
// "options.joinDuplicateHeaders",
// );
}
this[kJoinDuplicateHeaders] = _joinDuplicateHeaders;
if (options.pfx) {
throw new Error("pfx is not supported");
}
if (options.rejectUnauthorized !== undefined) this._ensureTls().rejectUnauthorized = options.rejectUnauthorized;
else {
let agentRejectUnauthorized = agent?.options?.rejectUnauthorized;
if (agentRejectUnauthorized !== undefined) this._ensureTls().rejectUnauthorized = agentRejectUnauthorized;
else {
// popular https-proxy-agent uses connectOpts
agentRejectUnauthorized = agent?.connectOpts?.rejectUnauthorized;
if (agentRejectUnauthorized !== undefined) this._ensureTls().rejectUnauthorized = agentRejectUnauthorized;
}
}
if (options.ca) {
if (!isValidTLSArray(options.ca))
throw new TypeError(
"ca argument must be an string, Buffer, TypedArray, BunFile or an array containing string, Buffer, TypedArray or BunFile",
);
this._ensureTls().ca = options.ca;
}
if (options.cert) {
if (!isValidTLSArray(options.cert))
throw new TypeError(
"cert argument must be an string, Buffer, TypedArray, BunFile or an array containing string, Buffer, TypedArray or BunFile",
);
this._ensureTls().cert = options.cert;
}
if (options.key) {
if (!isValidTLSArray(options.key))
throw new TypeError(
"key argument must be an string, Buffer, TypedArray, BunFile or an array containing string, Buffer, TypedArray or BunFile",
);
this._ensureTls().key = options.key;
}
if (options.passphrase) {
if (typeof options.passphrase !== "string") throw new TypeError("passphrase argument must be a string");
this._ensureTls().passphrase = options.passphrase;
}
if (options.ciphers) {
if (typeof options.ciphers !== "string") throw new TypeError("ciphers argument must be a string");
this._ensureTls().ciphers = options.ciphers;
}
if (options.servername) {
if (typeof options.servername !== "string") throw new TypeError("servername argument must be a string");
this._ensureTls().servername = options.servername;
}
if (options.secureOptions) {
if (typeof options.secureOptions !== "number") throw new TypeError("secureOptions argument must be a string");
this._ensureTls().secureOptions = options.secureOptions;
}
this[kPath] = options.path || "/";
if (cb) {
this.once("response", cb);
}
$debug(`new ClientRequest: ${this[kMethod]} ${this[kProtocol]}//${this[kHost]}:${this[kPort]}${this[kPath]}`);
// if (
// method === "GET" ||
// method === "HEAD" ||
// method === "DELETE" ||
// method === "OPTIONS" ||
// method === "TRACE" ||
// method === "CONNECT"
// ) {
// this.useChunkedEncodingByDefault = false;
// } else {
// this.useChunkedEncodingByDefault = true;
// }
this.finished = false;
this[kRes] = null;
this[kUpgradeOrConnect] = false;
this[kParser] = null;
this[kMaxHeadersCount] = null;
this[kReusedSocket] = false;
this[kHost] = host;
this[kProtocol] = protocol;
const timeout = options.timeout;
if (timeout !== undefined && timeout !== 0) {
this.setTimeout(timeout, undefined);
}
const { headers } = options;
const headersArray = $isJSArray(headers);
if (!headersArray) {
if (headers) {
for (let key in headers) {
this.setHeader(key, headers[key]);
}
}
// if (host && !this.getHeader("host") && setHost) {
// let hostHeader = host;
// // For the Host header, ensure that IPv6 addresses are enclosed
// // in square brackets, as defined by URI formatting
// // https://tools.ietf.org/html/rfc3986#section-3.2.2
// const posColon = StringPrototypeIndexOf.$call(hostHeader, ":");
// if (
// posColon !== -1 &&
// StringPrototypeIncludes.$call(hostHeader, ":", posColon + 1) &&
// StringPrototypeCharCodeAt.$call(hostHeader, 0) !== 91 /* '[' */
// ) {
// hostHeader = `[${hostHeader}]`;
// }
// if (port && +port !== defaultPort) {
// hostHeader += ":" + port;
// }
// this.setHeader("Host", hostHeader);
// }
var auth = options.auth;
if (auth && !this.getHeader("Authorization")) {
this.setHeader("Authorization", "Basic " + Buffer.from(auth).toString("base64"));
}
// if (this.getHeader("expect")) {
// if (this._header) {
// throw new ERR_HTTP_HEADERS_SENT("render");
// }
// this._storeHeader(
// this.method + " " + this.path + " HTTP/1.1\r\n",
// this[kOutHeaders],
// );
// }
// } else {
// this._storeHeader(
// this.method + " " + this.path + " HTTP/1.1\r\n",
// options.headers,
// );
}
// this[kUniqueHeaders] = parseUniqueHeadersOption(options.uniqueHeaders);
const { signal: _signal, ...optsWithoutSignal } = options;
this[kOptions] = optsWithoutSignal;
this._httpMessage = this;
process.nextTick(emitContinueAndSocketNT, this);
this[kEmitState] = 0;
this.setSocketKeepAlive = (enable = true, initialDelay = 0) => {
$debug(`${NODE_HTTP_WARNING}\n`, "WARN: ClientRequest.setSocketKeepAlive is a no-op");
};
this.setNoDelay = (noDelay = true) => {
$debug(`${NODE_HTTP_WARNING}\n`, "WARN: ClientRequest.setNoDelay is a no-op");
};
this[kClearTimeout] = () => {
const timeoutTimer = this[kTimeoutTimer];
if (timeoutTimer) {
clearTimeout(timeoutTimer);
this[kTimeoutTimer] = undefined;
this.removeAllListeners("timeout");
}
};
const onTimeout = () => {
this[kTimeoutTimer] = undefined;
this[kAbortController]?.abort();
this.emit("timeout");
};
this.setTimeout = (msecs, callback) => {
if (this.destroyed) return this;
this.timeout = msecs = validateMsecs(msecs, "msecs");
// Attempt to clear an existing timer in both cases -
// even if it will be rescheduled we don't want to leak an existing timer.
clearTimeout(this[kTimeoutTimer]!);
if (msecs === 0) {
if (callback !== undefined) {
validateFunction(callback, "callback");
this.removeListener("timeout", callback);
}
this[kTimeoutTimer] = undefined;
} else {
this[kTimeoutTimer] = setTimeout(onTimeout, msecs).unref();
if (callback !== undefined) {
validateFunction(callback, "callback");
this.once("timeout", callback);
}
}
return this;
};
}
const ClientRequestPrototype = {
constructor: ClientRequest,
__proto__: OutgoingMessage.prototype,
get path() {
return this[kPath];
},
get port() {
return this[kPort];
},
get method() {
return this[kMethod];
},
get host() {
return this[kHost];
},
get protocol() {
return this[kProtocol];
},
get agent() {
return this[kAgent];
},
set agent(value) {
this[kAgent] = value;
},
get aborted() {
return this[abortedSymbol] || this[kSignal]?.aborted || !!this[kAbortController]?.signal.aborted;
},
set aborted(value) {
this[abortedSymbol] = value;
},
get writable() {
return true;
},
};
ClientRequest.prototype = ClientRequestPrototype;
$setPrototypeDirect.$call(ClientRequest, OutgoingMessage);
function validateHost(host, name) {
if (host !== null && host !== undefined && typeof host !== "string") {
throw $ERR_INVALID_ARG_TYPE(`options.${name}`, ["string", "undefined", "null"], host);
}
return host;
}
function emitContinueAndSocketNT(self) {
if (self.destroyed) return;
// Ref: https://github.com/nodejs/node/blob/f63e8b7fa7a4b5e041ddec67307609ec8837154f/lib/_http_client.js#L803-L839
if (!(self[kEmitState] & (1 << ClientRequestEmitState.socket))) {
self[kEmitState] |= 1 << ClientRequestEmitState.socket;
self.emit("socket", self.socket);
}
// Emit continue event for the client (internally we auto handle it)
if (!self._closed && self.getHeader("expect") === "100-continue") {
self.emit("continue");
}
}
function emitAbortNextTick(self) {
self.emit("abort");
}
export default {
ClientRequest,
kBodyChunks,
abortedSymbol,
};

View File

@@ -0,0 +1,476 @@
const { Readable } = require("node:stream");
const {
abortedSymbol,
eofInProgress,
kHandle,
noBodySymbol,
typeSymbol,
NodeHTTPIncomingRequestType,
fakeSocketSymbol,
isAbortError,
emitErrorNextTickIfErrorListenerNT,
kEmptyObject,
getIsNextIncomingMessageHTTPS,
setIsNextIncomingMessageHTTPS,
NodeHTTPBodyReadState,
emitEOFIncomingMessage,
reqSymbol,
bodyStreamSymbol,
statusMessageSymbol,
statusCodeSymbol,
webRequestOrResponse,
NodeHTTPResponseAbortEvent,
STATUS_CODES,
} = require("internal/http");
const { FakeSocket } = require("node:_http_outgoing");
var defaultIncomingOpts = { type: "request" };
const nop = () => {};
const {
assignHeaders: assignHeadersFast,
setRequestTimeout,
headersTuple,
webRequestOrResponseHasBodyValue,
getCompleteWebRequestOrResponseBodyValueAsArrayBuffer,
} = $cpp("NodeHTTP.cpp", "createNodeHTTPInternalBinding") as {
assignHeaders: (object: any, req: Request, headersTuple: any) => boolean;
setRequestTimeout: (req: Request, timeout: number) => boolean;
headersTuple: any;
webRequestOrResponseHasBodyValue: (arg: any) => boolean;
getCompleteWebRequestOrResponseBodyValueAsArrayBuffer: (arg: any) => ArrayBuffer | undefined;
};
function assignHeadersSlow(object, req) {
const headers = req.headers;
var outHeaders = Object.create(null);
const rawHeaders: string[] = [];
var i = 0;
for (let key in headers) {
var originalKey = key;
var value = headers[originalKey];
key = key.toLowerCase();
if (key !== "set-cookie") {
value = String(value);
$putByValDirect(rawHeaders, i++, originalKey);
$putByValDirect(rawHeaders, i++, value);
outHeaders[key] = value;
} else {
if ($isJSArray(value)) {
outHeaders[key] = value.slice();
for (let entry of value) {
$putByValDirect(rawHeaders, i++, originalKey);
$putByValDirect(rawHeaders, i++, entry);
}
} else {
value = String(value);
outHeaders[key] = [value];
$putByValDirect(rawHeaders, i++, originalKey);
$putByValDirect(rawHeaders, i++, value);
}
}
}
object.headers = outHeaders;
object.rawHeaders = rawHeaders;
}
function assignHeaders(object, req) {
// This fast path is an 8% speedup for a "hello world" node:http server, and a 7% speedup for a "hello world" express server
if (assignHeadersFast(req, object, headersTuple)) {
const headers = $getInternalField(headersTuple, 0);
const rawHeaders = $getInternalField(headersTuple, 1);
$putInternalField(headersTuple, 0, undefined);
$putInternalField(headersTuple, 1, undefined);
object.headers = headers;
object.rawHeaders = rawHeaders;
return true;
} else {
assignHeadersSlow(object, req);
return false;
}
}
function onIncomingMessagePauseNodeHTTPResponse(this: IncomingMessage) {
const handle = this[kHandle];
if (handle && !this.destroyed) {
const paused = handle.pause();
}
}
function onIncomingMessageResumeNodeHTTPResponse(this: IncomingMessage) {
const handle = this[kHandle];
if (handle && !this.destroyed) {
const resumed = handle.resume();
if (resumed && resumed !== true) {
const bodyReadState = handle.hasBody;
if ((bodyReadState & NodeHTTPBodyReadState.done) !== 0) {
emitEOFIncomingMessage(this);
}
this.push(resumed);
}
}
}
function IncomingMessage(req, options = defaultIncomingOpts) {
this[abortedSymbol] = false;
this[eofInProgress] = false;
this._consuming = false;
this._dumped = false;
this.complete = false;
this._closed = false;
// (url, method, headers, rawHeaders, handle, hasBody)
if (req === kHandle) {
this[typeSymbol] = NodeHTTPIncomingRequestType.NodeHTTPResponse;
this.url = arguments[1];
this.method = arguments[2];
this.headers = arguments[3];
this.rawHeaders = arguments[4];
this[kHandle] = arguments[5];
this[noBodySymbol] = !arguments[6];
this[fakeSocketSymbol] = arguments[7];
Readable.$call(this);
// If there's a body, pay attention to pause/resume events
if (arguments[6]) {
this.on("pause", onIncomingMessagePauseNodeHTTPResponse);
this.on("resume", onIncomingMessageResumeNodeHTTPResponse);
}
} else {
this[noBodySymbol] = false;
Readable.$call(this);
var { [typeSymbol]: type, [reqSymbol]: nodeReq } = options || {};
this[webRequestOrResponse] = req;
this[typeSymbol] = type;
this[bodyStreamSymbol] = undefined;
const statusText = (req as Response)?.statusText;
this[statusMessageSymbol] = statusText !== "" ? statusText || null : "";
this[statusCodeSymbol] = (req as Response)?.status || 200;
if (type === NodeHTTPIncomingRequestType.FetchRequest || type === NodeHTTPIncomingRequestType.FetchResponse) {
if (!assignHeaders(this, req)) {
this[fakeSocketSymbol] = req;
}
} else {
// Node defaults url and method to null.
this.url = "";
this.method = null;
this.rawHeaders = [];
}
this[noBodySymbol] =
type === NodeHTTPIncomingRequestType.FetchRequest // TODO: Add logic for checking for body on response
? requestHasNoBody(this.method, this)
: false;
if (getIsNextIncomingMessageHTTPS()) {
this.socket.encrypted = true;
setIsNextIncomingMessageHTTPS(false);
}
}
this._readableState.readingMore = true;
}
function onDataIncomingMessage(
this: import("node:http").IncomingMessage,
chunk,
isLast,
aborted: NodeHTTPResponseAbortEvent,
) {
if (aborted === NodeHTTPResponseAbortEvent.abort) {
this.destroy();
return;
}
if (chunk && !this._dumped) this.push(chunk);
if (isLast) {
emitEOFIncomingMessage(this);
}
}
const IncomingMessagePrototype = {
constructor: IncomingMessage,
__proto__: Readable.prototype,
_construct(callback) {
// TODO: streaming
const type = this[typeSymbol];
if (type === NodeHTTPIncomingRequestType.FetchResponse) {
if (!webRequestOrResponseHasBodyValue(this[webRequestOrResponse])) {
this.complete = true;
this.push(null);
}
}
callback();
},
// Call this instead of resume() if we want to just
// dump all the data to /dev/null
_dump() {
if (!this._dumped) {
this._dumped = true;
// If there is buffered data, it may trigger 'data' events.
// Remove 'data' event listeners explicitly.
this.removeAllListeners("data");
const handle = this[kHandle];
if (handle) {
handle.ondata = undefined;
}
this.resume();
}
},
_read(size) {
if (!this._consuming) {
this._readableState.readingMore = false;
this._consuming = true;
}
const socket = this.socket;
if (socket && socket.readable) {
//https://github.com/nodejs/node/blob/13e3aef053776be9be262f210dc438ecec4a3c8d/lib/_http_incoming.js#L211-L213
socket.resume();
}
if (this[eofInProgress]) {
// There is a nextTick pending that will emit EOF
return;
}
let internalRequest;
if (this[noBodySymbol]) {
emitEOFIncomingMessage(this);
return;
} else if ((internalRequest = this[kHandle])) {
const bodyReadState = internalRequest.hasBody;
if (
(bodyReadState & NodeHTTPBodyReadState.done) !== 0 ||
bodyReadState === NodeHTTPBodyReadState.none ||
this._dumped
) {
emitEOFIncomingMessage(this);
}
if ((bodyReadState & NodeHTTPBodyReadState.hasBufferedDataDuringPause) !== 0) {
const drained = internalRequest.drainRequestBody();
if (drained && !this._dumped) {
this.push(drained);
}
}
if (!internalRequest.ondata) {
internalRequest.ondata = onDataIncomingMessage.bind(this);
internalRequest.hasCustomOnData = false;
}
return true;
} else if (this[bodyStreamSymbol] == null) {
// If it's all available right now, we skip going through ReadableStream.
let completeBody = getCompleteWebRequestOrResponseBodyValueAsArrayBuffer(this[webRequestOrResponse]);
if (completeBody) {
$assert(completeBody instanceof ArrayBuffer, "completeBody is not an ArrayBuffer");
$assert(completeBody.byteLength > 0, "completeBody should not be empty");
// They're ignoring the data. Let's not do anything with it.
if (!this._dumped) {
this.push(new Buffer(completeBody));
}
emitEOFIncomingMessage(this);
return;
}
const reader = this[webRequestOrResponse].body?.getReader?.() as ReadableStreamDefaultReader;
if (!reader) {
emitEOFIncomingMessage(this);
return;
}
this[bodyStreamSymbol] = reader;
consumeStream(this, reader);
}
return;
},
_finish() {
this.emit("prefinish");
},
_destroy: function IncomingMessage_destroy(err, cb) {
const shouldEmitAborted = !this.readableEnded || !this.complete;
if (shouldEmitAborted) {
this[abortedSymbol] = true;
// IncomingMessage emits 'aborted'.
// Client emits 'abort'.
this.emit("aborted");
}
// Suppress "AbortError" from fetch() because we emit this in the 'aborted' event
if (isAbortError(err)) {
err = undefined;
}
var nodeHTTPResponse = this[kHandle];
if (nodeHTTPResponse) {
this[kHandle] = undefined;
nodeHTTPResponse.onabort = nodeHTTPResponse.ondata = undefined;
if (!nodeHTTPResponse.finished && shouldEmitAborted) {
nodeHTTPResponse.abort();
}
const socket = this.socket;
if (socket && !socket.destroyed && shouldEmitAborted) {
socket.destroy(err);
}
} else {
const stream = this[bodyStreamSymbol];
this[bodyStreamSymbol] = undefined;
const streamState = stream?.$state;
if (streamState === $streamReadable || streamState === $streamWaiting || streamState === $streamWritable) {
stream?.cancel?.().catch(nop);
}
const socket = this[fakeSocketSymbol];
if (socket && !socket.destroyed && shouldEmitAborted) {
socket.destroy(err);
}
}
if ($isCallable(cb)) {
emitErrorNextTickIfErrorListenerNT(this, err, cb);
}
},
get aborted() {
return this[abortedSymbol];
},
set aborted(value) {
this[abortedSymbol] = value;
},
get connection() {
return (this[fakeSocketSymbol] ??= new FakeSocket());
},
get statusCode() {
return this[statusCodeSymbol];
},
set statusCode(value) {
if (!(value in STATUS_CODES)) return;
this[statusCodeSymbol] = value;
},
get statusMessage() {
return this[statusMessageSymbol];
},
set statusMessage(value) {
this[statusMessageSymbol] = value;
},
get httpVersion() {
return "1.1";
},
set httpVersion(value) {
// noop
},
get httpVersionMajor() {
return 1;
},
set httpVersionMajor(value) {
// noop
},
get httpVersionMinor() {
return 1;
},
set httpVersionMinor(value) {
// noop
},
get rawTrailers() {
return [];
},
set rawTrailers(value) {
// noop
},
get trailers() {
return kEmptyObject;
},
set trailers(value) {
// noop
},
setTimeout(msecs, callback) {
this.take;
const req = this[kHandle] || this[webRequestOrResponse];
if (req) {
if (setRequestTimeout(req, Math.ceil(msecs / 1000))) {
typeof callback === "function" && this.once("timeout", callback);
} else {
// Actually a Response object
req.setTimeout?.(msecs, callback);
}
}
return this;
},
get socket() {
return (this[fakeSocketSymbol] ??= new FakeSocket());
},
set socket(value) {
this[fakeSocketSymbol] = value;
},
} satisfies typeof import("node:http").IncomingMessage.prototype;
IncomingMessage.prototype = IncomingMessagePrototype;
$setPrototypeDirect.$call(IncomingMessage, Readable);
function requestHasNoBody(method, req) {
if ("GET" === method || "HEAD" === method || "TRACE" === method || "CONNECT" === method || "OPTIONS" === method)
return true;
const headers = req?.headers;
const contentLength = headers?.["content-length"];
if (!parseInt(contentLength, 10)) return true;
return false;
}
async function consumeStream(self, reader: ReadableStreamDefaultReader) {
var done = false,
value,
aborted = false;
try {
while (true) {
const result = reader.readMany();
if ($isPromise(result)) {
({ done, value } = await result);
} else {
({ done, value } = result);
}
if (self.destroyed || (aborted = self[abortedSymbol])) {
break;
}
if (!self._dumped) {
for (var v of value) {
self.push(v);
}
}
if (self.destroyed || (aborted = self[abortedSymbol]) || done) {
break;
}
}
} catch (err) {
if (aborted || self.destroyed) return;
self.destroy(err);
} finally {
reader?.cancel?.().catch?.(nop);
}
if (!self.complete) {
emitEOFIncomingMessage(self);
}
}
export default {
IncomingMessage,
};

View File

@@ -0,0 +1,394 @@
const { Stream, Duplex } = require("internal/stream");
const { validateFunction } = require("internal/validators");
const {
headerStateSymbol,
NodeHTTPHeaderState,
kAbortController,
fakeSocketSymbol,
validateHeaderName,
validateHeaderValue,
headersSymbol,
kBodyChunks,
kEmitState,
ClientRequestEmitState,
kInternalSocketData,
kEmptyObject,
validateMsecs,
hasServerResponseFinished,
timeoutTimerSymbol,
kHandle,
serverSymbol,
} = require("internal/http");
const { kAutoDestroyed } = require("internal/shared");
const { getHeader, setHeader, Headers } = $cpp("NodeHTTP.cpp", "createNodeHTTPInternalBinding") as {
getHeader: (headers: Headers, name: string) => string | undefined;
setHeader: (headers: Headers, name: string, value: string) => void;
Headers: (typeof globalThis)["Headers"];
};
const getRawKeys = $newCppFunction("JSFetchHeaders.cpp", "jsFetchHeaders_getRawKeys", 0);
function OutgoingMessage(options) {
if (!new.target) {
return new OutgoingMessage(options);
}
Stream.$call(this, options);
this.sendDate = true;
this.finished = false;
this[headerStateSymbol] = NodeHTTPHeaderState.none;
this[kAbortController] = null;
this.writable = true;
this.destroyed = false;
this._hasBody = true;
this._trailer = "";
this._contentLength = null;
this._closed = false;
this._header = null;
this._headerSent = false;
}
const OutgoingMessagePrototype = {
constructor: OutgoingMessage,
__proto__: Stream.prototype,
// These are fields which we do not use in our implementation, but are observable in Node.js.
_keepAliveTimeout: 0,
_defaultKeepAlive: true,
shouldKeepAlive: true,
_onPendingData: function nop() {},
outputSize: 0,
outputData: [],
strictContentLength: false,
_removedTE: false,
_removedContLen: false,
_removedConnection: false,
usesChunkedEncodingByDefault: true,
_closed: false,
appendHeader(name, value) {
var headers = (this[headersSymbol] ??= new Headers());
headers.append(name, value);
return this;
},
_implicitHeader() {
throw $ERR_METHOD_NOT_IMPLEMENTED("_implicitHeader()");
},
flushHeaders() {},
getHeader(name) {
return getHeader(this[headersSymbol], name);
},
// Overridden by ClientRequest and ServerResponse; this version will be called only if the user constructs OutgoingMessage directly.
write(chunk, encoding, callback) {
if ($isCallable(chunk)) {
callback = chunk;
chunk = undefined;
} else if ($isCallable(encoding)) {
callback = encoding;
encoding = undefined;
} else if (!$isCallable(callback)) {
callback = undefined;
encoding = undefined;
}
hasServerResponseFinished(this, chunk, callback);
if (chunk) {
const len = Buffer.byteLength(chunk, encoding || (typeof chunk === "string" ? "utf8" : "buffer"));
if (len > 0) {
this.outputSize += len;
this.outputData.push(chunk);
}
}
return this.writableHighWaterMark >= this.outputSize;
},
getHeaderNames() {
var headers = this[headersSymbol];
if (!headers) return [];
return Array.from(headers.keys());
},
getRawHeaderNames() {
var headers = this[headersSymbol];
if (!headers) return [];
return getRawKeys.$call(headers);
},
getHeaders() {
const headers = this[headersSymbol];
if (!headers) return kEmptyObject;
return headers.toJSON();
},
removeHeader(name) {
if ((this._header !== undefined && this._header !== null) || this[headerStateSymbol] === NodeHTTPHeaderState.sent) {
throw $ERR_HTTP_HEADERS_SENT("remove");
}
const headers = this[headersSymbol];
if (!headers) return;
headers.delete(name);
},
setHeader(name, value) {
if ((this._header !== undefined && this._header !== null) || this[headerStateSymbol] === NodeHTTPHeaderState.sent) {
throw $ERR_HTTP_HEADERS_SENT("set");
}
validateHeaderName(name);
validateHeaderValue(name, value);
const headers = (this[headersSymbol] ??= new Headers());
setHeader(headers, name, value);
return this;
},
hasHeader(name) {
const headers = this[headersSymbol];
if (!headers) return false;
return headers.has(name);
},
get headers() {
const headers = this[headersSymbol];
if (!headers) return kEmptyObject;
return headers.toJSON();
},
set headers(value) {
this[headersSymbol] = new Headers(value);
},
addTrailers(headers) {
throw new Error("not implemented");
},
setTimeout(msecs, callback) {
if (this.destroyed) return this;
this.timeout = msecs = validateMsecs(msecs, "msecs");
// Attempt to clear an existing timer in both cases -
// even if it will be rescheduled we don't want to leak an existing timer.
clearTimeout(this[timeoutTimerSymbol]);
if (msecs === 0) {
if (callback != null) {
if (!$isCallable(callback)) validateFunction(callback, "callback");
this.removeListener("timeout", callback);
}
this[timeoutTimerSymbol] = undefined;
} else {
this[timeoutTimerSymbol] = setTimeout(onTimeout.bind(this), msecs).unref();
if (callback != null) {
if (!$isCallable(callback)) validateFunction(callback, "callback");
this.once("timeout", callback);
}
}
return this;
},
get connection() {
return this.socket;
},
get socket() {
this[fakeSocketSymbol] = this[fakeSocketSymbol] ?? new FakeSocket();
return this[fakeSocketSymbol];
},
set socket(value) {
this[fakeSocketSymbol] = value;
},
get chunkedEncoding() {
return false;
},
set chunkedEncoding(value) {
// noop
},
get writableObjectMode() {
return false;
},
get writableLength() {
return 0;
},
get writableHighWaterMark() {
return 16 * 1024;
},
get writableNeedDrain() {
return !this.destroyed && !this.finished && this[kBodyChunks] && this[kBodyChunks].length > 0;
},
get writableEnded() {
return this.finished;
},
get writableFinished() {
return this.finished && !!(this[kEmitState] & (1 << ClientRequestEmitState.finish));
},
_send(data, encoding, callback, byteLength) {
if (this.destroyed) {
return false;
}
return this.write(data, encoding, callback);
},
end(chunk, encoding, callback) {
return this;
},
destroy(err?: Error) {
if (this.destroyed) return this;
const handle = this[kHandle];
this.destroyed = true;
if (handle) {
handle.abort();
}
return this;
},
};
OutgoingMessage.prototype = OutgoingMessagePrototype;
$setPrototypeDirect.$call(OutgoingMessage, Stream);
function onTimeout() {
this[timeoutTimerSymbol] = undefined;
this[kAbortController]?.abort();
const handle = this[kHandle];
this.emit("timeout");
if (handle) {
handle.abort();
}
}
type FakeSocket = InstanceType<typeof FakeSocket>;
var FakeSocket = class Socket extends Duplex {
[kInternalSocketData]!: [typeof Server, typeof OutgoingMessage, typeof Request];
bytesRead = 0;
bytesWritten = 0;
connecting = false;
timeout = 0;
isServer = false;
#address;
address() {
// Call server.requestIP() without doing any property getter twice.
var internalData;
return (this.#address ??=
(internalData = this[kInternalSocketData])?.[0]?.[serverSymbol].requestIP(internalData[2]) ?? {});
}
get bufferSize() {
return this.writableLength;
}
connect(port, host, connectListener) {
return this;
}
_destroy(err, callback) {
const socketData = this[kInternalSocketData];
if (!socketData) return; // sometimes 'this' is Socket not FakeSocket
if (!socketData[1]["req"][kAutoDestroyed]) socketData[1].end();
}
_final(callback) {}
get localAddress() {
return this.address() ? "127.0.0.1" : undefined;
}
get localFamily() {
return "IPv4";
}
get localPort() {
return 80;
}
get pending() {
return this.connecting;
}
_read(size) {}
get readyState() {
if (this.connecting) return "opening";
if (this.readable) {
return this.writable ? "open" : "readOnly";
} else {
return this.writable ? "writeOnly" : "closed";
}
}
ref() {
return this;
}
get remoteAddress() {
return this.address()?.address;
}
set remoteAddress(val) {
// initialize the object so that other properties wouldn't be lost
this.address().address = val;
}
get remotePort() {
return this.address()?.port;
}
set remotePort(val) {
// initialize the object so that other properties wouldn't be lost
this.address().port = val;
}
get remoteFamily() {
return this.address()?.family;
}
set remoteFamily(val) {
// initialize the object so that other properties wouldn't be lost
this.address().family = val;
}
resetAndDestroy() {}
setKeepAlive(enable = false, initialDelay = 0) {}
setNoDelay(noDelay = true) {
return this;
}
setTimeout(timeout, callback) {
const socketData = this[kInternalSocketData];
if (!socketData) return; // sometimes 'this' is Socket not FakeSocket
const [server, http_res, req] = socketData;
http_res?.req?.setTimeout(timeout, callback);
return this;
}
unref() {
return this;
}
_write(chunk, encoding, callback) {}
};
Object.defineProperty(FakeSocket, "name", { value: "Socket" });
export default {
OutgoingMessage,
FakeSocket,
OutgoingMessagePrototype,
};

1637
src/js/node/_http_server.ts Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,27 @@
'use strict';
// This tests that the error emitted on the socket does
// not get fired again when the 'error' event handler throws
// an error.
const common = require('../common');
const { addresses } = require('../common/internet');
const { errorLookupMock } = require('../common/dns');
const assert = require('assert');
const http = require('http');
const host = addresses.INVALID_HOST;
const req = http.get({
host,
lookup: common.mustCall(errorLookupMock())
});
const err = new Error('mock unexpected code error');
req.on('error', common.mustCall(() => {
throw err;
}));
process.on('uncaughtException', common.mustCall((e) => {
assert.strictEqual(e, err);
}));

View File

@@ -0,0 +1,27 @@
'use strict';
const common = require('../common');
const assert = require('node:assert');
const http = require('node:http');
const headers = { foo: 'Bar' };
const server = http.createServer(common.mustCall((req, res) => {
assert.strictEqual(req.url, '/ping?q=term');
assert.strictEqual(req.headers?.foo, headers.foo);
req.resume();
req.on('end', () => {
res.writeHead(200);
res.end('pong');
});
}));
server.listen(0, common.localhostIPv4, () => {
const { address, port } = server.address();
const url = new URL(`http://${address}:${port}/ping?q=term`);
url.headers = headers;
const clientReq = http.request(url);
clientReq.on('close', common.mustCall(() => {
server.close();
}));
clientReq.end();
});

View File

@@ -0,0 +1,14 @@
'use strict';
const common = require('../common');
const http = require('http');
const server = http.createServer((req, res) => res.flushHeaders());
server.listen(common.mustCall(() => {
const req =
http.get({ port: server.address().port }, common.mustCall((res) => {
res.on('timeout', common.mustCall(() => req.destroy()));
res.setTimeout(1);
server.close();
}));
}));

View File

@@ -0,0 +1,40 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const http = require('http');
// Verify that after calling end() on an `OutgoingMessage` (or a type that
// inherits from `OutgoingMessage`), its `writable` property is not set to false
const server = http.createServer(common.mustCall(function(req, res) {
assert.strictEqual(res.writable, true);
assert.strictEqual(res.finished, false);
assert.strictEqual(res.writableEnded, false);
res.end();
// res.writable is set to false after it has finished sending
// Ref: https://github.com/nodejs/node/issues/15029
assert.strictEqual(res.writable, true);
assert.strictEqual(res.finished, true);
assert.strictEqual(res.writableEnded, true);
server.close();
}));
server.listen(0);
server.on('listening', common.mustCall(function() {
const clientRequest = http.request({
port: server.address().port,
method: 'GET',
path: '/'
});
assert.strictEqual(clientRequest.writable, true);
clientRequest.end();
// Writable is still true when close
// THIS IS LEGACY, we cannot change it
// unless we break error detection
assert.strictEqual(clientRequest.writable, true);
}));

View File

@@ -0,0 +1,86 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
'use strict';
const common = require('../common');
const assert = require('assert');
const http = require('http');
const net = require('net');
const Countdown = require('../common/countdown');
const testCases = [
{ path: '/200', statusMessage: 'OK',
response: 'HTTP/1.1 200 OK\r\n\r\n' },
{ path: '/500', statusMessage: 'Internal Server Error',
response: 'HTTP/1.1 500 Internal Server Error\r\n\r\n' },
{ path: '/302', statusMessage: 'Moved Temporarily',
response: 'HTTP/1.1 302 Moved Temporarily\r\n\r\n' },
{ path: '/missing', statusMessage: '',
response: 'HTTP/1.1 200 \r\n\r\n' },
{ path: '/missing-no-space', statusMessage: '',
response: 'HTTP/1.1 200\r\n\r\n' },
];
testCases.findByPath = function(path) {
const matching = this.filter(function(testCase) {
return testCase.path === path;
});
if (matching.length === 0) {
assert.fail(`failed to find test case with path ${path}`);
}
return matching[0];
};
const server = net.createServer(function(connection) {
connection.on('data', function(data) {
const path = data.toString().match(/GET (.*) HTTP\/1\.1/)[1];
const testCase = testCases.findByPath(path);
connection.write(testCase.response);
connection.end();
});
});
const countdown = new Countdown(testCases.length, () => server.close());
function runTest(testCaseIndex) {
const testCase = testCases[testCaseIndex];
http.get({
port: server.address().port,
path: testCase.path
}, function(response) {
console.log(`client: expected status message: ${testCase.statusMessage}`);
console.log(`client: actual status message: ${response.statusMessage}`);
assert.strictEqual(testCase.statusMessage, response.statusMessage);
response.on('aborted', common.mustNotCall());
response.on('end', function() {
countdown.dec();
if (testCaseIndex + 1 < testCases.length) {
runTest(testCaseIndex + 1);
}
});
response.resume();
});
}
server.listen(0, function() { runTest(0); });

View File

@@ -0,0 +1,90 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const http = require('http');
const Countdown = require('../common/countdown');
const MAX_REQUESTS = 13;
let reqNum = 0;
function test(res, header, code) {
assert.throws(() => {
res.writeHead(header);
}, {
code: 'ERR_HTTP_INVALID_STATUS_CODE',
name: 'RangeError',
message: `Invalid status code: ${code}`
});
}
const server = http.Server(common.mustCall(function(req, res) {
switch (reqNum) {
case 0:
test(res, -1, '-1');
break;
case 1:
test(res, Infinity, 'Infinity');
break;
case 2:
test(res, NaN, 'NaN');
break;
case 3:
test(res, {}, '{}');
break;
case 4:
test(res, 99, '99');
break;
case 5:
test(res, 1000, '1000');
break;
case 6:
test(res, '1000', '1000');
break;
case 7:
test(res, null, 'null');
break;
case 8:
test(res, true, 'true');
break;
case 9:
test(res, [], '[]');
break;
case 10:
test(res, 'this is not valid', 'this is not valid');
break;
case 11:
test(res, '404 this is not valid either', '404 this is not valid either');
break;
case 12:
assert.throws(() => { res.writeHead(); },
{
code: 'ERR_HTTP_INVALID_STATUS_CODE',
name: 'RangeError',
message: 'Invalid status code: undefined'
});
this.close();
break;
default:
throw new Error('Unexpected request');
}
res.statusCode = 200;
res.end();
}, MAX_REQUESTS));
server.listen();
const countdown = new Countdown(MAX_REQUESTS, () => server.close());
server.on('listening', function makeRequest() {
http.get({
port: this.address().port
}, (res) => {
assert.strictEqual(res.statusCode, 200);
res.on('end', () => {
countdown.dec();
reqNum = MAX_REQUESTS - countdown.remaining;
if (countdown.remaining > 0)
makeRequest.call(this);
});
res.resume();
});
});

View File

@@ -3,7 +3,7 @@ import { expect, test } from "bun:test";
import { bunEnv, bunExe } from "harness";
test("node:http should not crash when server throws, and should abruptly close the socket", async () => {
const { promise, resolve, reject } = Promise.withResolvers();
const { promise, resolve, reject } = Promise.withResolvers<string>();
await using server = spawn({
cwd: import.meta.dirname,
cmd: [bunExe(), "04298.fixture.js"],
@@ -12,12 +12,12 @@ test("node:http should not crash when server throws, and should abruptly close t
ipc(url) {
resolve(url);
},
onExit(exitCode, signalCode) {
onExit(proc, exitCode, signalCode) {
if (signalCode || exitCode !== 0) {
reject(new Error(`process exited with code ${signalCode || exitCode}`));
}
},
});
const url = await promise;
const response = await fetch(url);
const response = await fetch(url).catch(() => {});
});