mirror of
https://github.com/oven-sh/bun
synced 2026-02-17 14:22:01 +00:00
Compare commits
72 Commits
claude/fix
...
ciro/fetch
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
24751ed940 | ||
|
|
27f08bc305 | ||
|
|
e61c23e26b | ||
|
|
65bb8cb36a | ||
|
|
a706902996 | ||
|
|
2836cd49aa | ||
|
|
9ffb9763f5 | ||
|
|
f02ebff5d8 | ||
|
|
f358f77512 | ||
|
|
28819a7e49 | ||
|
|
2c719f7d09 | ||
|
|
fbee1c8e74 | ||
|
|
8fc82cf22a | ||
|
|
06922bcfae | ||
|
|
4f0866d7b8 | ||
|
|
e475055148 | ||
|
|
58330503f2 | ||
|
|
a2bfb02826 | ||
|
|
f052b2d944 | ||
|
|
6333dd44cb | ||
|
|
5de400ee0c | ||
|
|
16b632e249 | ||
|
|
cedbe28d01 | ||
|
|
7f36e5c0ac | ||
|
|
aaae722c0d | ||
|
|
b7ce235e93 | ||
|
|
e10c995139 | ||
|
|
b05964def4 | ||
|
|
4b61f99adf | ||
|
|
6ce1e14e5c | ||
|
|
27ec84905d | ||
|
|
b0dac57298 | ||
|
|
c33fc879dc | ||
|
|
36a2fb6093 | ||
|
|
2560c08922 | ||
|
|
4f0d126d75 | ||
|
|
3a5c3ac657 | ||
|
|
e782f9ebbd | ||
|
|
3a10be5191 | ||
|
|
b49e20c193 | ||
|
|
19491b9db3 | ||
|
|
79bbada1ed | ||
|
|
ccf021fab9 | ||
|
|
8df4827833 | ||
|
|
0609fa5122 | ||
|
|
ed21db9414 | ||
|
|
8727416808 | ||
|
|
c7f6623878 | ||
|
|
1503715c0e | ||
|
|
4cbe315002 | ||
|
|
fbdde3a89c | ||
|
|
ed037cece0 | ||
|
|
e2767b970d | ||
|
|
68c5a293eb | ||
|
|
6b5e41a0b3 | ||
|
|
50bc82a97e | ||
|
|
345666b194 | ||
|
|
01785cf3b7 | ||
|
|
56f1d991cc | ||
|
|
0853c555b2 | ||
|
|
ae42b8f045 | ||
|
|
e5abcf45ef | ||
|
|
0e8f4ba114 | ||
|
|
efda53ea6f | ||
|
|
d561b17b46 | ||
|
|
1dc1d4795e | ||
|
|
1cef7f29e9 | ||
|
|
c38d674cbf | ||
|
|
e75b9aa01b | ||
|
|
8ad5f0bb57 | ||
|
|
f51aed3f40 | ||
|
|
b1990a484f |
134
src/js/internal/http/WrappedUpgradeSocket.ts
Normal file
134
src/js/internal/http/WrappedUpgradeSocket.ts
Normal file
@@ -0,0 +1,134 @@
|
||||
const Duplex = require("internal/streams/duplex");
|
||||
|
||||
const kWrappedSocketWritable = Symbol("WrappedSocketWritable");
|
||||
|
||||
class WrappedSocket extends Duplex {
|
||||
#fetchBody: ReadableStream<Uint8Array> | null = null;
|
||||
#resolveNextRead: ((value: Uint8Array | null) => void) | null = null;
|
||||
#queue: { value: Buffer | null; cb: () => void }[] = [];
|
||||
#ended: boolean = false;
|
||||
#res: any;
|
||||
#emitClose: () => void;
|
||||
constructor(fetchBody: ReadableStream<Uint8Array> | null, res: any, emitClose: () => void) {
|
||||
super();
|
||||
this.#fetchBody = fetchBody;
|
||||
this.#res = res;
|
||||
this.#emitClose = emitClose;
|
||||
}
|
||||
|
||||
#write(value, cb) {
|
||||
if (this.#ended) {
|
||||
cb();
|
||||
return;
|
||||
}
|
||||
if (this.#resolveNextRead) {
|
||||
this.#resolveNextRead(value);
|
||||
this.#resolveNextRead = null;
|
||||
cb();
|
||||
} else {
|
||||
this.#queue.push({ value, cb });
|
||||
}
|
||||
}
|
||||
|
||||
setNoDelay() {
|
||||
return this;
|
||||
}
|
||||
|
||||
setKeepAlive() {
|
||||
return this;
|
||||
}
|
||||
|
||||
setTimeout() {
|
||||
return this;
|
||||
}
|
||||
|
||||
#end() {
|
||||
if (this.#ended) return;
|
||||
this.#ended = true;
|
||||
this.#res.complete = true;
|
||||
this.#res._dump();
|
||||
this.#emitClose();
|
||||
}
|
||||
|
||||
async *[kWrappedSocketWritable]() {
|
||||
while (true) {
|
||||
if (this.#queue.length === 0) {
|
||||
if (this.listenerCount("drain") > 0) {
|
||||
this.emit("drain");
|
||||
}
|
||||
const { promise, resolve } = Promise.withResolvers();
|
||||
this.#resolveNextRead = resolve;
|
||||
const value = await promise;
|
||||
if (value === null) {
|
||||
this.#end();
|
||||
break;
|
||||
}
|
||||
yield value;
|
||||
}
|
||||
if (this.#queue.length > 0) {
|
||||
const { value, cb } = this.#queue.shift();
|
||||
if (value !== null) {
|
||||
yield value;
|
||||
cb();
|
||||
} else {
|
||||
this.#end();
|
||||
cb();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async #consumeBody() {
|
||||
try {
|
||||
if (this.#fetchBody) {
|
||||
const reader = await this.#fetchBody.getReader();
|
||||
this.#fetchBody = null;
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
this.push(value);
|
||||
}
|
||||
this.push(null);
|
||||
}
|
||||
} catch (e) {
|
||||
if (e.code === "ECONNRESET") {
|
||||
// end the readable side gracefully because the server closed the connection
|
||||
this.push(null);
|
||||
} else {
|
||||
this.destroy(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Writable side proxies to inner writable
|
||||
_write(chunk, enc, cb) {
|
||||
let buffer = chunk;
|
||||
if (!Buffer.isBuffer(buffer)) {
|
||||
buffer = Buffer.from(buffer, enc);
|
||||
}
|
||||
this.#write(buffer, cb);
|
||||
}
|
||||
|
||||
_final(cb) {
|
||||
this.#write(null, cb);
|
||||
this.#ended = true;
|
||||
}
|
||||
|
||||
_read(_size) {
|
||||
this.#consumeBody();
|
||||
}
|
||||
|
||||
_destroy(err, cb) {
|
||||
if (!this.readableEnded) {
|
||||
this.push(null);
|
||||
}
|
||||
this.#write(null, cb);
|
||||
cb(err);
|
||||
}
|
||||
}
|
||||
|
||||
export default {
|
||||
WrappedSocket,
|
||||
kWrappedSocketWritable,
|
||||
};
|
||||
@@ -1,5 +1,4 @@
|
||||
const { isIP, isIPv6 } = require("internal/net/isIP");
|
||||
|
||||
const { checkIsHttpToken, validateFunction, validateInteger, validateBoolean } = require("internal/validators");
|
||||
const { urlToHttpOptions } = require("internal/url");
|
||||
const { isValidTLSArray } = require("internal/tls");
|
||||
@@ -268,7 +267,7 @@ function ClientRequest(input, options, cb) {
|
||||
const method = this[kMethod];
|
||||
|
||||
let keepalive = true;
|
||||
const agentKeepalive = this[kAgent]?.keepAlive;
|
||||
const agentKeepalive = this[kAgent]?.keepalive;
|
||||
if (agentKeepalive !== undefined) {
|
||||
keepalive = agentKeepalive;
|
||||
}
|
||||
@@ -313,31 +312,60 @@ function ClientRequest(input, options, cb) {
|
||||
decompress: false,
|
||||
keepalive,
|
||||
};
|
||||
const upgradeHeader = fetchOptions?.headers?.upgrade;
|
||||
const isUpgrade = typeof upgradeHeader === "string" && upgradeHeader !== "h2" && upgradeHeader !== "h2c";
|
||||
let keepOpen = false;
|
||||
// no body and not finished
|
||||
const isDuplex = customBody === undefined && !this.finished;
|
||||
const isDuplex = isUpgrade || (customBody === undefined && !this.finished);
|
||||
|
||||
if (isDuplex) {
|
||||
fetchOptions.duplex = "half";
|
||||
keepOpen = true;
|
||||
}
|
||||
|
||||
if (method !== "GET" && method !== "HEAD" && method !== "OPTIONS") {
|
||||
let upgradedResponse: ((socket: any) => void) | undefined = undefined;
|
||||
let kWrappedSocketWritable: symbol | undefined;
|
||||
if (isUpgrade) {
|
||||
const { promise: upgradedPromise, resolve } = Promise.withResolvers<any>();
|
||||
upgradedResponse = resolve;
|
||||
let socketIter: AsyncIterator<any> | null = null;
|
||||
fetchOptions.body = new ReadableStream({
|
||||
async pull(controller) {
|
||||
if (!socketIter) {
|
||||
const socket = await upgradedPromise;
|
||||
if (!socket) {
|
||||
controller.close();
|
||||
return;
|
||||
}
|
||||
socketIter = socket[kWrappedSocketWritable!]()[Symbol.asyncIterator]();
|
||||
}
|
||||
const { value, done } = await socketIter!.next();
|
||||
if (done) {
|
||||
controller.close();
|
||||
} else {
|
||||
controller.enqueue(value);
|
||||
}
|
||||
},
|
||||
});
|
||||
} else if (method !== "GET" && method !== "HEAD" && method !== "OPTIONS") {
|
||||
const self = this;
|
||||
if (customBody !== undefined) {
|
||||
fetchOptions.body = customBody;
|
||||
} else if (isDuplex) {
|
||||
fetchOptions.body = async function* () {
|
||||
while (self[kBodyChunks]?.length > 0) {
|
||||
yield self[kBodyChunks].shift();
|
||||
}
|
||||
|
||||
if (self[kBodyChunks]?.length === 0) {
|
||||
self.emit("drain");
|
||||
}
|
||||
|
||||
while (!self.finished) {
|
||||
yield await new Promise(resolve => {
|
||||
fetchOptions.body = new ReadableStream({
|
||||
async pull(controller) {
|
||||
while (self[kBodyChunks]?.length > 0) {
|
||||
controller.enqueue(self[kBodyChunks].shift());
|
||||
}
|
||||
if (self[kBodyChunks]?.length === 0) {
|
||||
self.emit("drain");
|
||||
}
|
||||
if (self.finished) {
|
||||
handleResponse?.();
|
||||
controller.close();
|
||||
return;
|
||||
}
|
||||
const chunk = await new Promise<any>(resolve => {
|
||||
resolveNextChunk = end => {
|
||||
resolveNextChunk = undefined;
|
||||
if (end) {
|
||||
@@ -347,14 +375,18 @@ function ClientRequest(input, options, cb) {
|
||||
}
|
||||
};
|
||||
});
|
||||
|
||||
if (chunk !== null && chunk !== undefined) {
|
||||
controller.enqueue(chunk);
|
||||
}
|
||||
if (self[kBodyChunks]?.length === 0) {
|
||||
self.emit("drain");
|
||||
}
|
||||
}
|
||||
|
||||
handleResponse?.();
|
||||
};
|
||||
if (self.finished) {
|
||||
handleResponse?.();
|
||||
controller.close();
|
||||
}
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -379,6 +411,7 @@ function ClientRequest(input, options, cb) {
|
||||
//@ts-ignore
|
||||
this[kFetchRequest] = fetch(url, fetchOptions).then(response => {
|
||||
if (this.aborted) {
|
||||
upgradedResponse?.(null);
|
||||
maybeEmitClose();
|
||||
return;
|
||||
}
|
||||
@@ -423,6 +456,21 @@ function ClientRequest(input, options, cb) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (isUpgrade) {
|
||||
if (response.status === 101) {
|
||||
const {
|
||||
WrappedSocket,
|
||||
kWrappedSocketWritable: kSymbol,
|
||||
} = require("internal/http/WrappedUpgradeSocket");
|
||||
kWrappedSocketWritable = kSymbol;
|
||||
const socket = new WrappedSocket(response.body, res, maybeEmitClose);
|
||||
upgradedResponse!(socket);
|
||||
this.socket = socket;
|
||||
this.emit("upgrade", res, socket, Buffer.alloc(0));
|
||||
return;
|
||||
}
|
||||
upgradedResponse!(null);
|
||||
}
|
||||
if (self.aborted || !self.emit("response", res)) {
|
||||
res._dump();
|
||||
}
|
||||
|
||||
@@ -59,7 +59,7 @@ const sendHelper = $newZigFunction("node_cluster_binding.zig", "sendHelperChild"
|
||||
const kServerResponse = Symbol("ServerResponse");
|
||||
const kRejectNonStandardBodyWrites = Symbol("kRejectNonStandardBodyWrites");
|
||||
const GlobalPromise = globalThis.Promise;
|
||||
const kEmptyBuffer = Buffer.alloc(0);
|
||||
|
||||
const ObjectKeys = Object.keys;
|
||||
const MathMin = Math.min;
|
||||
|
||||
@@ -537,7 +537,7 @@ Server.prototype[kRealListen] = function (tls, port, host, socketPath, reusePort
|
||||
socket[kEnableStreaming](true);
|
||||
const { promise, resolve } = $newPromiseCapability(Promise);
|
||||
socket.once("close", resolve);
|
||||
server.emit("connect", http_req, socket, kEmptyBuffer);
|
||||
server.emit("connect", http_req, socket, Buffer.alloc(0));
|
||||
return promise;
|
||||
} else {
|
||||
// Node.js will close the socket and will NOT respond with 400 Bad Request
|
||||
@@ -599,7 +599,7 @@ Server.prototype[kRealListen] = function (tls, port, host, socketPath, reusePort
|
||||
http_res.end();
|
||||
socket.destroy();
|
||||
} else if (is_upgrade) {
|
||||
server.emit("upgrade", http_req, socket, kEmptyBuffer);
|
||||
server.emit("upgrade", http_req, socket, Buffer.alloc(0));
|
||||
if (!socket._httpMessage) {
|
||||
if (canUseInternalAssignSocket) {
|
||||
// ~10% performance improvement in JavaScriptCore due to avoiding .once("close", ...) and removing a listener
|
||||
|
||||
99
test/js/node/test/parallel/test-http-upgrade-client.js
Normal file
99
test/js/node/test/parallel/test-http-upgrade-client.js
Normal file
@@ -0,0 +1,99 @@
|
||||
// Copyright Joyent, Inc. and other Node contributors.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the
|
||||
// "Software"), to deal in the Software without restriction, including
|
||||
// without limitation the rights to use, copy, modify, merge, publish,
|
||||
// distribute, sublicense, and/or sell copies of the Software, and to permit
|
||||
// persons to whom the Software is furnished to do so, subject to the
|
||||
// following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included
|
||||
// in all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
||||
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
||||
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
||||
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
||||
// USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
||||
'use strict';
|
||||
// Verify that the 'upgrade' header causes an 'upgrade' event to be emitted to
|
||||
// the HTTP client. This test uses a raw TCP server to better control server
|
||||
// behavior.
|
||||
|
||||
const common = require('../common');
|
||||
const assert = require('assert');
|
||||
|
||||
const http = require('http');
|
||||
const net = require('net');
|
||||
const Countdown = require('../common/countdown');
|
||||
|
||||
const expectedRecvData = 'nurtzo';
|
||||
|
||||
// Create a TCP server
|
||||
const server = net.createServer(function(c) {
|
||||
c.on('data', function(d) {
|
||||
c.write('HTTP/1.1 101\r\n');
|
||||
c.write('hello: world\r\n');
|
||||
c.write('connection: upgrade\r\n');
|
||||
c.write('upgrade: websocket\r\n');
|
||||
c.write('\r\n');
|
||||
c.write(expectedRecvData);
|
||||
});
|
||||
|
||||
c.on('end', function() {
|
||||
c.end();
|
||||
});
|
||||
});
|
||||
|
||||
server.listen(0, "127.0.0.1", common.mustCall(function() {
|
||||
const port = this.address().port;
|
||||
const headers = [
|
||||
{
|
||||
connection: 'upgrade',
|
||||
upgrade: 'websocket'
|
||||
},
|
||||
[
|
||||
['Host', 'echo.websocket.org'],
|
||||
['Connection', 'Upgrade'],
|
||||
['Upgrade', 'websocket'],
|
||||
['Origin', 'http://www.websocket.org'],
|
||||
],
|
||||
];
|
||||
const countdown = new Countdown(headers.length, () => server.close());
|
||||
|
||||
headers.forEach(function(h) {
|
||||
const req = http.get({
|
||||
port: port,
|
||||
hostname: "127.0.0.1",
|
||||
headers: h
|
||||
});
|
||||
let sawUpgrade = false;
|
||||
req.on('upgrade', common.mustCall(function(res, socket, upgradeHead) {
|
||||
sawUpgrade = true;
|
||||
let recvData = upgradeHead;
|
||||
socket.on('data', function(d) {
|
||||
recvData += d;
|
||||
});
|
||||
|
||||
socket.on('close', common.mustCall(function() {
|
||||
assert.strictEqual(recvData.toString(), expectedRecvData);
|
||||
}));
|
||||
|
||||
const expectedHeaders = {
|
||||
hello: 'world',
|
||||
connection: 'upgrade',
|
||||
upgrade: 'websocket'
|
||||
};
|
||||
assert.deepStrictEqual(res.headers, expectedHeaders);
|
||||
socket.end();
|
||||
countdown.dec();
|
||||
}));
|
||||
req.on('close', common.mustCall(function() {
|
||||
assert.strictEqual(sawUpgrade, true);
|
||||
}));
|
||||
});
|
||||
}));
|
||||
@@ -1,4 +1,5 @@
|
||||
import { describe, expect, test } from "bun:test";
|
||||
import http from "http";
|
||||
import { decodeFrames, encodeCloseFrame, encodeTextFrame, upgradeHeaders } from "./websocket.helpers";
|
||||
|
||||
describe("fetch upgrade", () => {
|
||||
@@ -60,4 +61,80 @@ describe("fetch upgrade", () => {
|
||||
expect(serverMessages).toEqual(["hello", "world", "bye", "close"]);
|
||||
expect(clientMessages).toEqual(["Hello World", "close"]);
|
||||
});
|
||||
|
||||
test("should upgrade to websocket using http.request", async () => {
|
||||
const serverMessages: string[] = [];
|
||||
using server = Bun.serve({
|
||||
port: 0,
|
||||
fetch(req) {
|
||||
if (server.upgrade(req)) return;
|
||||
return new Response("Hello World");
|
||||
},
|
||||
websocket: {
|
||||
open(ws) {
|
||||
ws.send("Hello World");
|
||||
},
|
||||
message(ws, message) {
|
||||
serverMessages.push(message as string);
|
||||
},
|
||||
close(ws) {
|
||||
serverMessages.push("close");
|
||||
},
|
||||
},
|
||||
});
|
||||
const req = http.request(
|
||||
{
|
||||
port: server.url.port,
|
||||
hostname: server.url.hostname,
|
||||
headers: upgradeHeaders(),
|
||||
},
|
||||
res => {
|
||||
expect.unreachable("should not call response callback");
|
||||
},
|
||||
);
|
||||
const clientMessages: string[] = [];
|
||||
|
||||
const { promise, resolve, reject } = Promise.withResolvers<void>();
|
||||
|
||||
req.on("upgrade", (req, socket, head) => {
|
||||
try {
|
||||
expect(req.statusCode).toBe(101);
|
||||
expect(req.headers.upgrade).toBe("websocket");
|
||||
expect(req.headers["sec-websocket-accept"]).toBeDefined();
|
||||
expect(req.headers.connection).toBe("Upgrade");
|
||||
expect(head).toBeDefined();
|
||||
expect(Buffer.isBuffer(head)).toBe(true);
|
||||
function onData(data: Buffer) {
|
||||
for (const msg of decodeFrames(data)) {
|
||||
if (typeof msg === "string") {
|
||||
clientMessages.push(msg);
|
||||
} else {
|
||||
clientMessages.push(msg.type);
|
||||
if (msg.type === "close") {
|
||||
socket.end();
|
||||
resolve();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (head.length > 0) {
|
||||
onData(head);
|
||||
}
|
||||
socket.on("data", onData);
|
||||
|
||||
socket.write(encodeTextFrame("hello"));
|
||||
socket.write(encodeTextFrame("world"));
|
||||
socket.write(encodeTextFrame("bye"));
|
||||
socket.write(encodeCloseFrame());
|
||||
} catch (err) {
|
||||
reject(err);
|
||||
}
|
||||
});
|
||||
|
||||
req.end();
|
||||
await promise;
|
||||
|
||||
expect(serverMessages).toEqual(["hello", "world", "bye", "close"]);
|
||||
expect(clientMessages).toEqual(["Hello World", "close"]);
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user