Compare commits

...

72 Commits

Author SHA1 Message Date
Ciro Spaciari MacBook
24751ed940 address review 2026-01-09 13:14:17 -08:00
Ciro Spaciari
27f08bc305 Merge branch 'main' into ciro/fetch-upgrade-node-http 2026-01-09 11:31:36 -08:00
Ciro Spaciari MacBook
e61c23e26b revert but keep ReadableStream 2026-01-09 10:32:51 -08:00
Ciro Spaciari MacBook
65bb8cb36a refactor(http): convert ClientRequest body generators to ReadableStream
Replace async generator functions with ReadableStream factories for
fetchOptions.body in ClientRequest. This moves the body generation
logic to module scope and avoids issues with bound async generator
functions not being recognized by fetch.

- bodyStreamClientRequest: Returns ReadableStream for duplex request bodies
- upgradeBodyStreamClientRequest: Returns ReadableStream for HTTP upgrade bodies
2026-01-08 16:10:44 -08:00
Ciro Spaciari MacBook
a706902996 Merge remote-tracking branch 'origin/main' into ciro/fetch-upgrade-node-http 2026-01-08 15:50:37 -08:00
Ciro Spaciari MacBook
2836cd49aa refactor(http): move ClientRequest closures to module scope
Refactor the ClientRequest constructor to eliminate anonymous function
closures that capture the entire constructor scope. This improves memory
characteristics by allowing earlier garbage collection.

Changes:
- Add symbols to internal/http.ts for mutable state: kWriteCount,
  kResolveNextChunk, kFetching, kOnEnd, kHandleResponse
- Move emit helpers to module scope: maybeEmitSocketClientRequest,
  maybeEmitPrefinishClientRequest, maybeEmitFinishClientRequest,
  maybeEmitCloseClientRequest
- Move event handlers to module scope: socketCloseListenerClientRequest,
  onAbortClientRequest
- Move write functions to module scope: pushChunkClientRequest,
  writeInternalClientRequest, writeClientRequest, endClientRequest
- Move utility functions to module scope: flushHeadersClientRequest,
  destroyClientRequest, abortClientRequest, ensureTlsClientRequest,
  signalAbortHandler, clearTimeoutClientRequest
- Move send/fetch functions to module scope: sendClientRequest,
  bodyIteratorClientRequest, goClientRequest, startFetchClientRequest,
  iterateCandidatesClientRequest, getURLClientRequest,
  failDNSLookupClientRequest

The streaming body generator still uses a minimal closure that only
captures 'self', with all mutable state stored in symbol properties.
2026-01-08 15:40:54 -08:00
Ciro Spaciari
9ffb9763f5 Merge branch 'main' into ciro/fetch-upgrade-node-http 2025-12-01 11:49:48 -08:00
Ciro Spaciari
f02ebff5d8 Merge branch 'main' into ciro/fetch-upgrade-node-http 2025-11-19 11:35:39 -08:00
Ciro Spaciari
f358f77512 revert so fetch is keeped alive 2025-11-14 15:44:10 -08:00
Ciro Spaciari
28819a7e49 bind only 1 2025-11-14 15:34:08 -08:00
Ciro Spaciari
2c719f7d09 address review 2025-11-14 15:14:12 -08:00
Ciro Spaciari
fbee1c8e74 Merge branch 'main' into ciro/fetch-upgrade-node-http 2025-11-14 14:57:58 -08:00
Ciro Spaciari
8fc82cf22a address review 2025-11-14 14:57:29 -08:00
Ciro Spaciari
06922bcfae Merge branch 'main' into ciro/fetch-upgrade-node-http 2025-11-14 13:07:38 -08:00
Ciro Spaciari
4f0866d7b8 Merge branch 'main' into ciro/fetch-upgrade-node-http 2025-11-13 15:12:28 -08:00
Ciro Spaciari
e475055148 fix duplicate declare 2025-11-13 14:47:36 -08:00
autofix-ci[bot]
58330503f2 [autofix.ci] apply automated fixes 2025-11-13 22:44:53 +00:00
Ciro Spaciari
a2bfb02826 Merge branch 'main' into ciro/fetch-upgrade-node-http 2025-11-13 14:43:13 -08:00
Ciro Spaciari
f052b2d944 Merge branch 'main' into ciro/fetch-upgrade-node-http 2025-09-05 11:54:56 -07:00
Ciro Spaciari
6333dd44cb fixes #20547 2025-09-04 17:21:07 -07:00
Ciro Spaciari
5de400ee0c Merge branch 'ciro/fetch-upgrade' into ciro/fetch-upgrade-node-http 2025-09-04 17:16:16 -07:00
Ciro Spaciari
16b632e249 missed that 2025-09-04 17:15:40 -07:00
Ciro Spaciari
cedbe28d01 avoid chunked here 2025-09-04 17:15:40 -07:00
Ciro Spaciari
7f36e5c0ac ok 2025-09-04 17:15:40 -07:00
Ciro Spaciari
aaae722c0d we need to close at some point 2025-09-04 17:15:40 -07:00
Ciro Spaciari
b7ce235e93 dont break stuff 2025-09-04 17:15:40 -07:00
Ciro Spaciari
e10c995139 opsie 2025-09-04 17:15:40 -07:00
Ciro Spaciari
b05964def4 more generic 2025-09-04 17:15:40 -07:00
autofix-ci[bot]
4b61f99adf [autofix.ci] apply automated fixes 2025-09-04 17:15:40 -07:00
Ciro Spaciari
6ce1e14e5c ok 2025-09-04 17:15:40 -07:00
Ciro Spaciari
27ec84905d Merge branch 'ciro/fetch-upgrade' into ciro/fetch-upgrade-node-http 2025-09-04 17:10:49 -07:00
Ciro Spaciari
b0dac57298 missed that 2025-09-04 17:10:40 -07:00
Ciro Spaciari
c33fc879dc more 2025-09-04 17:10:18 -07:00
Ciro Spaciari
36a2fb6093 more 2025-09-04 17:10:04 -07:00
Ciro Spaciari
2560c08922 Merge branch 'ciro/fetch-upgrade' into ciro/fetch-upgrade-node-http 2025-09-04 16:37:10 -07:00
Ciro Spaciari
4f0d126d75 avoid chunked here 2025-09-04 16:36:54 -07:00
Ciro Spaciari
3a5c3ac657 ok 2025-09-04 15:53:31 -07:00
Ciro Spaciari
e782f9ebbd Merge branch 'ciro/fetch-upgrade' into ciro/fetch-upgrade-node-http 2025-09-04 15:45:14 -07:00
Ciro Spaciari
3a10be5191 ok 2025-09-04 15:45:06 -07:00
autofix-ci[bot]
b49e20c193 [autofix.ci] apply automated fixes 2025-09-04 22:40:13 +00:00
Ciro Spaciari
19491b9db3 minimal test 2025-09-04 15:38:49 -07:00
Ciro Spaciari
79bbada1ed linter 2025-09-04 15:27:35 -07:00
Ciro Spaciari
ccf021fab9 Merge branch 'ciro/fetch-upgrade' into ciro/fetch-upgrade-node-http 2025-09-04 15:22:02 -07:00
Ciro Spaciari
8df4827833 we need to close at some point 2025-09-04 15:21:53 -07:00
Ciro Spaciari
0609fa5122 dont break stuff 2025-09-04 15:21:53 -07:00
Ciro Spaciari
ed21db9414 opsie 2025-09-04 15:21:53 -07:00
Ciro Spaciari
8727416808 more generic 2025-09-04 15:21:53 -07:00
autofix-ci[bot]
c7f6623878 [autofix.ci] apply automated fixes 2025-09-04 15:21:53 -07:00
Ciro Spaciari
1503715c0e ok 2025-09-04 15:21:53 -07:00
Ciro Spaciari
4cbe315002 Merge branch 'ciro/fetch-upgrade' into ciro/fetch-upgrade-node-http 2025-09-04 15:21:38 -07:00
Ciro Spaciari
fbdde3a89c we need to close at some point 2025-09-04 15:21:11 -07:00
autofix-ci[bot]
ed037cece0 [autofix.ci] apply automated fixes 2025-09-04 22:14:29 +00:00
Ciro Spaciari
e2767b970d dont break stuff 2025-09-04 15:07:34 -07:00
Ciro Spaciari
68c5a293eb opsie 2025-09-04 15:07:34 -07:00
Ciro Spaciari
6b5e41a0b3 more generic 2025-09-04 15:07:34 -07:00
autofix-ci[bot]
50bc82a97e [autofix.ci] apply automated fixes 2025-09-04 15:07:34 -07:00
Ciro Spaciari
345666b194 ok 2025-09-04 15:07:34 -07:00
Ciro Spaciari
01785cf3b7 experiment 2025-09-04 15:06:20 -07:00
autofix-ci[bot]
56f1d991cc [autofix.ci] apply automated fixes (attempt 3/3) 2025-09-04 20:05:33 +00:00
autofix-ci[bot]
0853c555b2 [autofix.ci] apply automated fixes (attempt 2/3) 2025-09-04 20:03:03 +00:00
autofix-ci[bot]
ae42b8f045 [autofix.ci] apply automated fixes 2025-09-04 20:01:32 +00:00
Ciro Spaciari
e5abcf45ef dont break stuff 2025-09-04 12:59:30 -07:00
autofix-ci[bot]
0e8f4ba114 [autofix.ci] apply automated fixes (attempt 3/3) 2025-09-04 19:31:15 +00:00
autofix-ci[bot]
efda53ea6f [autofix.ci] apply automated fixes (attempt 2/3) 2025-09-04 19:29:16 +00:00
autofix-ci[bot]
d561b17b46 [autofix.ci] apply automated fixes 2025-09-04 19:27:41 +00:00
Ciro Spaciari
1dc1d4795e opsie 2025-09-04 12:25:02 -07:00
autofix-ci[bot]
1cef7f29e9 [autofix.ci] apply automated fixes (attempt 3/3) 2025-09-04 17:44:24 +00:00
autofix-ci[bot]
c38d674cbf [autofix.ci] apply automated fixes (attempt 2/3) 2025-09-04 17:42:40 +00:00
autofix-ci[bot]
e75b9aa01b [autofix.ci] apply automated fixes 2025-09-04 17:40:37 +00:00
Ciro Spaciari
8ad5f0bb57 more generic 2025-09-04 10:37:51 -07:00
autofix-ci[bot]
f51aed3f40 [autofix.ci] apply automated fixes 2025-09-04 10:37:51 -07:00
Ciro Spaciari
b1990a484f ok 2025-09-04 10:37:51 -07:00
5 changed files with 381 additions and 23 deletions

View File

@@ -0,0 +1,134 @@
const Duplex = require("internal/streams/duplex");
const kWrappedSocketWritable = Symbol("WrappedSocketWritable");
class WrappedSocket extends Duplex {
#fetchBody: ReadableStream<Uint8Array> | null = null;
#resolveNextRead: ((value: Uint8Array | null) => void) | null = null;
#queue: { value: Buffer | null; cb: () => void }[] = [];
#ended: boolean = false;
#res: any;
#emitClose: () => void;
constructor(fetchBody: ReadableStream<Uint8Array> | null, res: any, emitClose: () => void) {
super();
this.#fetchBody = fetchBody;
this.#res = res;
this.#emitClose = emitClose;
}
#write(value, cb) {
if (this.#ended) {
cb();
return;
}
if (this.#resolveNextRead) {
this.#resolveNextRead(value);
this.#resolveNextRead = null;
cb();
} else {
this.#queue.push({ value, cb });
}
}
setNoDelay() {
return this;
}
setKeepAlive() {
return this;
}
setTimeout() {
return this;
}
#end() {
if (this.#ended) return;
this.#ended = true;
this.#res.complete = true;
this.#res._dump();
this.#emitClose();
}
async *[kWrappedSocketWritable]() {
while (true) {
if (this.#queue.length === 0) {
if (this.listenerCount("drain") > 0) {
this.emit("drain");
}
const { promise, resolve } = Promise.withResolvers();
this.#resolveNextRead = resolve;
const value = await promise;
if (value === null) {
this.#end();
break;
}
yield value;
}
if (this.#queue.length > 0) {
const { value, cb } = this.#queue.shift();
if (value !== null) {
yield value;
cb();
} else {
this.#end();
cb();
break;
}
}
}
}
async #consumeBody() {
try {
if (this.#fetchBody) {
const reader = await this.#fetchBody.getReader();
this.#fetchBody = null;
while (true) {
const { done, value } = await reader.read();
if (done) break;
this.push(value);
}
this.push(null);
}
} catch (e) {
if (e.code === "ECONNRESET") {
// end the readable side gracefully because the server closed the connection
this.push(null);
} else {
this.destroy(e);
}
}
}
// Writable side proxies to inner writable
_write(chunk, enc, cb) {
let buffer = chunk;
if (!Buffer.isBuffer(buffer)) {
buffer = Buffer.from(buffer, enc);
}
this.#write(buffer, cb);
}
_final(cb) {
this.#write(null, cb);
this.#ended = true;
}
_read(_size) {
this.#consumeBody();
}
_destroy(err, cb) {
if (!this.readableEnded) {
this.push(null);
}
this.#write(null, cb);
cb(err);
}
}
export default {
WrappedSocket,
kWrappedSocketWritable,
};

View File

@@ -1,5 +1,4 @@
const { isIP, isIPv6 } = require("internal/net/isIP");
const { checkIsHttpToken, validateFunction, validateInteger, validateBoolean } = require("internal/validators");
const { urlToHttpOptions } = require("internal/url");
const { isValidTLSArray } = require("internal/tls");
@@ -268,7 +267,7 @@ function ClientRequest(input, options, cb) {
const method = this[kMethod];
let keepalive = true;
const agentKeepalive = this[kAgent]?.keepAlive;
const agentKeepalive = this[kAgent]?.keepalive;
if (agentKeepalive !== undefined) {
keepalive = agentKeepalive;
}
@@ -313,31 +312,60 @@ function ClientRequest(input, options, cb) {
decompress: false,
keepalive,
};
const upgradeHeader = fetchOptions?.headers?.upgrade;
const isUpgrade = typeof upgradeHeader === "string" && upgradeHeader !== "h2" && upgradeHeader !== "h2c";
let keepOpen = false;
// no body and not finished
const isDuplex = customBody === undefined && !this.finished;
const isDuplex = isUpgrade || (customBody === undefined && !this.finished);
if (isDuplex) {
fetchOptions.duplex = "half";
keepOpen = true;
}
if (method !== "GET" && method !== "HEAD" && method !== "OPTIONS") {
let upgradedResponse: ((socket: any) => void) | undefined = undefined;
let kWrappedSocketWritable: symbol | undefined;
if (isUpgrade) {
const { promise: upgradedPromise, resolve } = Promise.withResolvers<any>();
upgradedResponse = resolve;
let socketIter: AsyncIterator<any> | null = null;
fetchOptions.body = new ReadableStream({
async pull(controller) {
if (!socketIter) {
const socket = await upgradedPromise;
if (!socket) {
controller.close();
return;
}
socketIter = socket[kWrappedSocketWritable!]()[Symbol.asyncIterator]();
}
const { value, done } = await socketIter!.next();
if (done) {
controller.close();
} else {
controller.enqueue(value);
}
},
});
} else if (method !== "GET" && method !== "HEAD" && method !== "OPTIONS") {
const self = this;
if (customBody !== undefined) {
fetchOptions.body = customBody;
} else if (isDuplex) {
fetchOptions.body = async function* () {
while (self[kBodyChunks]?.length > 0) {
yield self[kBodyChunks].shift();
}
if (self[kBodyChunks]?.length === 0) {
self.emit("drain");
}
while (!self.finished) {
yield await new Promise(resolve => {
fetchOptions.body = new ReadableStream({
async pull(controller) {
while (self[kBodyChunks]?.length > 0) {
controller.enqueue(self[kBodyChunks].shift());
}
if (self[kBodyChunks]?.length === 0) {
self.emit("drain");
}
if (self.finished) {
handleResponse?.();
controller.close();
return;
}
const chunk = await new Promise<any>(resolve => {
resolveNextChunk = end => {
resolveNextChunk = undefined;
if (end) {
@@ -347,14 +375,18 @@ function ClientRequest(input, options, cb) {
}
};
});
if (chunk !== null && chunk !== undefined) {
controller.enqueue(chunk);
}
if (self[kBodyChunks]?.length === 0) {
self.emit("drain");
}
}
handleResponse?.();
};
if (self.finished) {
handleResponse?.();
controller.close();
}
},
});
}
}
@@ -379,6 +411,7 @@ function ClientRequest(input, options, cb) {
//@ts-ignore
this[kFetchRequest] = fetch(url, fetchOptions).then(response => {
if (this.aborted) {
upgradedResponse?.(null);
maybeEmitClose();
return;
}
@@ -423,6 +456,21 @@ function ClientRequest(input, options, cb) {
return;
}
try {
if (isUpgrade) {
if (response.status === 101) {
const {
WrappedSocket,
kWrappedSocketWritable: kSymbol,
} = require("internal/http/WrappedUpgradeSocket");
kWrappedSocketWritable = kSymbol;
const socket = new WrappedSocket(response.body, res, maybeEmitClose);
upgradedResponse!(socket);
this.socket = socket;
this.emit("upgrade", res, socket, Buffer.alloc(0));
return;
}
upgradedResponse!(null);
}
if (self.aborted || !self.emit("response", res)) {
res._dump();
}

View File

@@ -59,7 +59,7 @@ const sendHelper = $newZigFunction("node_cluster_binding.zig", "sendHelperChild"
const kServerResponse = Symbol("ServerResponse");
const kRejectNonStandardBodyWrites = Symbol("kRejectNonStandardBodyWrites");
const GlobalPromise = globalThis.Promise;
const kEmptyBuffer = Buffer.alloc(0);
const ObjectKeys = Object.keys;
const MathMin = Math.min;
@@ -537,7 +537,7 @@ Server.prototype[kRealListen] = function (tls, port, host, socketPath, reusePort
socket[kEnableStreaming](true);
const { promise, resolve } = $newPromiseCapability(Promise);
socket.once("close", resolve);
server.emit("connect", http_req, socket, kEmptyBuffer);
server.emit("connect", http_req, socket, Buffer.alloc(0));
return promise;
} else {
// Node.js will close the socket and will NOT respond with 400 Bad Request
@@ -599,7 +599,7 @@ Server.prototype[kRealListen] = function (tls, port, host, socketPath, reusePort
http_res.end();
socket.destroy();
} else if (is_upgrade) {
server.emit("upgrade", http_req, socket, kEmptyBuffer);
server.emit("upgrade", http_req, socket, Buffer.alloc(0));
if (!socket._httpMessage) {
if (canUseInternalAssignSocket) {
// ~10% performance improvement in JavaScriptCore due to avoiding .once("close", ...) and removing a listener

View File

@@ -0,0 +1,99 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
'use strict';
// Verify that the 'upgrade' header causes an 'upgrade' event to be emitted to
// the HTTP client. This test uses a raw TCP server to better control server
// behavior.
const common = require('../common');
const assert = require('assert');
const http = require('http');
const net = require('net');
const Countdown = require('../common/countdown');
const expectedRecvData = 'nurtzo';
// Create a TCP server
const server = net.createServer(function(c) {
c.on('data', function(d) {
c.write('HTTP/1.1 101\r\n');
c.write('hello: world\r\n');
c.write('connection: upgrade\r\n');
c.write('upgrade: websocket\r\n');
c.write('\r\n');
c.write(expectedRecvData);
});
c.on('end', function() {
c.end();
});
});
server.listen(0, "127.0.0.1", common.mustCall(function() {
const port = this.address().port;
const headers = [
{
connection: 'upgrade',
upgrade: 'websocket'
},
[
['Host', 'echo.websocket.org'],
['Connection', 'Upgrade'],
['Upgrade', 'websocket'],
['Origin', 'http://www.websocket.org'],
],
];
const countdown = new Countdown(headers.length, () => server.close());
headers.forEach(function(h) {
const req = http.get({
port: port,
hostname: "127.0.0.1",
headers: h
});
let sawUpgrade = false;
req.on('upgrade', common.mustCall(function(res, socket, upgradeHead) {
sawUpgrade = true;
let recvData = upgradeHead;
socket.on('data', function(d) {
recvData += d;
});
socket.on('close', common.mustCall(function() {
assert.strictEqual(recvData.toString(), expectedRecvData);
}));
const expectedHeaders = {
hello: 'world',
connection: 'upgrade',
upgrade: 'websocket'
};
assert.deepStrictEqual(res.headers, expectedHeaders);
socket.end();
countdown.dec();
}));
req.on('close', common.mustCall(function() {
assert.strictEqual(sawUpgrade, true);
}));
});
}));

View File

@@ -1,4 +1,5 @@
import { describe, expect, test } from "bun:test";
import http from "http";
import { decodeFrames, encodeCloseFrame, encodeTextFrame, upgradeHeaders } from "./websocket.helpers";
describe("fetch upgrade", () => {
@@ -60,4 +61,80 @@ describe("fetch upgrade", () => {
expect(serverMessages).toEqual(["hello", "world", "bye", "close"]);
expect(clientMessages).toEqual(["Hello World", "close"]);
});
test("should upgrade to websocket using http.request", async () => {
const serverMessages: string[] = [];
using server = Bun.serve({
port: 0,
fetch(req) {
if (server.upgrade(req)) return;
return new Response("Hello World");
},
websocket: {
open(ws) {
ws.send("Hello World");
},
message(ws, message) {
serverMessages.push(message as string);
},
close(ws) {
serverMessages.push("close");
},
},
});
const req = http.request(
{
port: server.url.port,
hostname: server.url.hostname,
headers: upgradeHeaders(),
},
res => {
expect.unreachable("should not call response callback");
},
);
const clientMessages: string[] = [];
const { promise, resolve, reject } = Promise.withResolvers<void>();
req.on("upgrade", (req, socket, head) => {
try {
expect(req.statusCode).toBe(101);
expect(req.headers.upgrade).toBe("websocket");
expect(req.headers["sec-websocket-accept"]).toBeDefined();
expect(req.headers.connection).toBe("Upgrade");
expect(head).toBeDefined();
expect(Buffer.isBuffer(head)).toBe(true);
function onData(data: Buffer) {
for (const msg of decodeFrames(data)) {
if (typeof msg === "string") {
clientMessages.push(msg);
} else {
clientMessages.push(msg.type);
if (msg.type === "close") {
socket.end();
resolve();
}
}
}
}
if (head.length > 0) {
onData(head);
}
socket.on("data", onData);
socket.write(encodeTextFrame("hello"));
socket.write(encodeTextFrame("world"));
socket.write(encodeTextFrame("bye"));
socket.write(encodeCloseFrame());
} catch (err) {
reject(err);
}
});
req.end();
await promise;
expect(serverMessages).toEqual(["hello", "world", "bye", "close"]);
expect(clientMessages).toEqual(["Hello World", "close"]);
});
});