polyfills: hono serve rewrite

This commit is contained in:
jhmaster2000
2023-12-11 02:09:48 -03:00
parent b8dcf2caf8
commit fb2dfb2337
6 changed files with 148 additions and 160 deletions

Binary file not shown.

View File

@@ -32,6 +32,7 @@
"typescript": "^5.2.2"
},
"dependencies": {
"@hono/node-server": "^1.3.1",
"argon2": "^0.31.2",
"bcryptjs": "^2.4.3",
"better-sqlite3": "^9.1.1",
@@ -39,6 +40,7 @@
"chalk": "^5.3.0",
"dateformat": "^5.0.3",
"expect": "^29.7.0",
"hono": "^3.11.4",
"html-rewriter-wasm": "^0.4.1",
"isomorphic-ws": "^5.0.0",
"jest-each": "^29.7.0",

View File

@@ -16,7 +16,7 @@ if (typeof process === 'object' && process !== null) {
process.versions.c_ares = '0e7a5dee0fbb04080750cf6eabbe89d8bae87faa' satisfies Process['versions'][string];
process.versions.zig = '0.12.0-dev.1604+caae40c21' satisfies Process['versions'][string];
process.versions.bun = '1.0.13' satisfies Process['versions'][string];
Reflect.set(process, 'revision', '5e0160552a0b63087cdf1cd5dd62f69fdca5d875' satisfies Process['revision']);
Reflect.set(process, 'revision', 'b8dcf2caf869bd911a3cb5df77f58fdce20bc185' satisfies Process['revision']);
/** @end_generated_code */
// Doesn't work on Windows sadly

View File

@@ -48,7 +48,7 @@ export const main = path.resolve(process.cwd(), process.argv[1] ?? 'repl') satis
//? These are automatically updated on build by tools/updateversions.ts, do not edit manually.
export const version = '1.0.13' satisfies typeof Bun.version;
export const revision = '5e0160552a0b63087cdf1cd5dd62f69fdca5d875' satisfies typeof Bun.revision;
export const revision = 'b8dcf2caf869bd911a3cb5df77f58fdce20bc185' satisfies typeof Bun.revision;
export const gc = (
globalThis.gc

View File

@@ -1,28 +1,14 @@
/// <reference types="node" />
import type {
Server as BunServer, Serve, TLSServeOptions, UnixTLSServeOptions, TLSWebSocketServeOptions, UnixTLSWebSocketServeOptions, ArrayBufferView, SocketAddress, WebSocketHandler, ServerWebSocket, WebSocketCompressor
Serve, TLSServeOptions, UnixTLSServeOptions, TLSWebSocketServeOptions, UnixTLSWebSocketServeOptions,
Server as BunServer, ArrayBufferView, SocketAddress, WebSocketHandler, ServerWebSocket, WebSocketCompressor
} from 'bun';
import { serve as honoServe } from '@hono/node-server';
import { WebSocketServer, type AddressInfo } from 'ws';
import { createHash } from 'node:crypto';
import uws from 'uWebSockets.js';
type uwsInternalField = { res: uws.HttpResponse; req: uws.HttpRequest; };
type uwsUpgradableRequest = { secwskey: string; secwsprotocol: string; secwsextensions: string; context: uws.us_socket_context_t; };
const uwsInternalFieldSymbol = Symbol('bun-polyfills.serve.uwsInternalField');
const uwsUpgradableRequestSymbol = Symbol('bun-polyfills.serve.uwsUpgradableRequest');
const wsCompressors: Record<WebSocketCompressor, number> = {
'128KB': uws.DEDICATED_COMPRESSOR_128KB,
'16KB': uws.DEDICATED_COMPRESSOR_16KB,
'256KB': uws.DEDICATED_COMPRESSOR_256KB,
'32KB': uws.DEDICATED_COMPRESSOR_32KB,
'3KB': uws.DEDICATED_COMPRESSOR_3KB,
'4KB': uws.DEDICATED_COMPRESSOR_4KB,
'64KB': uws.DEDICATED_COMPRESSOR_64KB,
'8KB': uws.DEDICATED_COMPRESSOR_8KB,
dedicated: uws.DEDICATED_COMPRESSOR_32KB,
disable: uws.DISABLED,
shared: uws.SHARED_COMPRESSOR,
} as const;
import fs from 'node:fs';
import http from 'node:http';
import { requestRemoteIPSymbol, requestUpgradedSymbol, toWebRequest } from '../../utils/webconv.js';
export function serve<T>(options: Serve<T>): BunServer {
return new Server(options);
@@ -55,100 +41,59 @@ class Server<T = undefined> implements BunServer {
this.#onRequest = options.fetch;
if (!this.#onRequest) throw new TypeError('Expected fetch() to be a function');
this.#uws = uws[tls ? 'SSLApp' : 'App']({
ca_file_name: tls?.ca instanceof Blob ? tls.ca.name
: tls?.ca instanceof Array ? (tls.ca[0] instanceof Blob ? tls.ca[0].name : tls.ca[0]) : tls?.ca,
cert_file_name: tls?.cert instanceof Blob ? tls.cert.name
: tls?.cert instanceof Array ? (tls.cert[0] instanceof Blob ? tls.cert[0].name : tls.cert[0]) : tls?.cert,
dh_params_file_name: tls?.dhParamsFile,
key_file_name: tls?.key instanceof Blob ? tls.key.name
: tls?.key instanceof Array ? (tls.key[0] instanceof Blob ? tls.key[0].name : tls.key[0]) : tls?.key,
passphrase: tls?.passphrase,
ssl_ciphers: tls?.secureOptions?.toString(),
ssl_prefer_low_memory_usage: tls?.lowMemoryMode,
if (tls?.ca instanceof Blob) tls.ca = tls.ca.name;
if (tls?.ca instanceof Array) tls.ca = tls.ca.map((ca) => ca instanceof Blob ? ca.name! : ca);
if (tls?.cert instanceof Blob) tls.cert = tls.cert.name;
if (tls?.cert instanceof Array) tls.cert = tls.cert.map((cert) => cert instanceof Blob ? cert.name! : cert);
if (tls?.key instanceof Blob) tls.key = tls.key.name;
if (tls?.key instanceof Array) tls.key = tls.key.map((key) => key instanceof Blob ? key.name! : key);
this.#server = honoServe({
serverOptions: {
ca: tls?.ca as string | Buffer | (string | Buffer)[] | undefined,
cert: tls?.cert as string | Buffer | (string | Buffer)[] | undefined,
dhparam: tls?.dhParamsFile ? fs.readFileSync(tls.dhParamsFile) : undefined,
key: tls?.key as string | Buffer | (string | Buffer)[] | undefined,
passphrase: tls?.passphrase,
},
hostname: listenOn.hostname,
port: listenOn.port,
fetch: async (request) => {
this.pendingRequests++;
const response = await this.#onRequest(request, this);
this.pendingRequests--;
return response;
},
}, (info) => { }) as http.Server;
this.#server.listen(listenOn.port, listenOn.hostname);
this.#server.on('error', (error) => {
if (this.#onError) this.#onError(error);
});
this.#server.on('upgrade', (req, duplex, head) => {
this.#onRequest(toWebRequest(req, undefined, this.#maxReqBodySize, true), this);
});
const httpHandler = async (res: uws.HttpResponse, req: uws.HttpRequest) => {
this.pendingRequests++;
res.onAborted(() => {
if (this.#onError) this.#onError(new Error('Aborted'));
});
const headers = new Headers();
req.forEach((name, value) => headers.append(name, value));
const query = req.getQuery();
const url = `${tls ? 'https' : 'http'}://${headers.get('host')}${req.getUrl()}${query ? '?' + query : ''}`;
const method = req.getMethod();
const body = method === 'GET' || method === 'HEAD' ? undefined : await getUwsHttpResponseBody(res, this.#maxReqBodySize);
const webReq = new Request(url, {
method,
headers,
body: body?.byteLength ? body : undefined,
});
Reflect.set(webReq, uwsInternalFieldSymbol, { res, req });
const webRes = await this.#onRequest(webReq, this);
if (Reflect.get(req, uwsUpgradableRequestSymbol)) return void this.pendingRequests--;
if (!webRes) return this.pendingRequests--, void res.endWithoutBody();
for (const [name, value] of webRes.headers) {
res.writeHeader(name, value);
}
res.writeStatus(`${webRes.status} ${webRes.statusText}`);
res.end(await webRes.arrayBuffer());
this.pendingRequests--;
};
this.#uws.any('/*', httpHandler);
if (this.#ws) this.#uws.ws('/*', {
sendPingsAutomatically: this.#ws.sendPings ?? true,
idleTimeout: this.#ws.idleTimeout ?? 120,
maxBackpressure: this.#ws.backpressureLimit ?? 1024 * 1024 * 16,
maxPayloadLength: this.#ws.maxPayloadLength ?? 1024 * 1024 * 16,
closeOnBackpressureLimit: Number(this.#ws.closeOnBackpressureLimit ?? false),
compression: !this.#ws.perMessageDeflate || typeof this.#ws.perMessageDeflate === 'boolean'
? (this.#ws.perMessageDeflate ? uws.SHARED_COMPRESSOR : uws.DISABLED)
: !this.#ws.perMessageDeflate.compress || typeof this.#ws.perMessageDeflate.compress === 'boolean'
? (this.#ws.perMessageDeflate.compress ? uws.SHARED_COMPRESSOR : uws.DISABLED)
: wsCompressors[this.#ws.perMessageDeflate.compress],
close: (ws, code, message) => {
if (this.#ws?.close) this.#ws.close(toBunSocket(ws), code, Buffer.from(message).toString('utf8'));
},
drain: (ws) => {
if (this.#ws?.drain) this.#ws.drain(toBunSocket(ws));
},
message: (ws, message, isBinary) => {
if (this.#ws?.message) {
const buf = Buffer.from(message);
this.#ws.message(toBunSocket(ws), isBinary ? buf : buf.toString('utf8'));
}
},
open: (ws) => {
this.pendingWebSockets++;
if (this.#ws?.open) this.#ws.open(toBunSocket(ws));
this.#wss = new WebSocketServer({
server: this.#server,
perMessageDeflate: typeof ws?.perMessageDeflate === 'boolean'
? ws.perMessageDeflate : !!ws?.perMessageDeflate?.compress || !!ws?.perMessageDeflate?.decompress,
backlog: ws?.backpressureLimit,
// @ts-expect-error untyped "maxPayload" option but it's in the docs
maxPayload: ws?.maxPayloadLength,
});
this.#wss.on('connection', (socket, req) => {
this.pendingWebSockets++;
this.#ws?.open?.(toBunSocket(socket, this));
if (this.#ws?.close) socket.onclose = (event) => {
this.#ws?.close?.(toBunSocket(socket, this), event.code, event.reason);
this.pendingWebSockets--;
},
ping: (ws, message) => {
if (this.#ws?.ping) this.#ws.ping(toBunSocket(ws), Buffer.from(message));
},
pong: (ws, message) => {
if (this.#ws?.pong) this.#ws.pong(toBunSocket(ws), Buffer.from(message));
},
subscription: (ws, topic, newCount, oldCount) => {
},
upgrade: async (res, req, context) => {
const secwskey = req.getHeader('sec-websocket-key');
const secwsprotocol = req.getHeader('sec-websocket-protocol');
const secwsextensions = req.getHeader('sec-websocket-extensions');
Reflect.set(req, uwsUpgradableRequestSymbol, { secwskey, secwsprotocol, secwsextensions, context });
await httpHandler(res, req);
},
};
if (this.#ws?.message) socket.onmessage = (event) => this.#ws?.message?.(toBunSocket(socket, this), event.data);
if (this.#ws?.ping) socket.addEventListener('ping', (event) => this.#ws?.ping?.(toBunSocket(socket, this), event.data));
if (this.#ws?.pong) socket.addEventListener('pong', (event) => this.#ws?.pong?.(toBunSocket(socket, this), event.data));
});
if (listenOn.unix) this.#uws.listen_unix((listenSock) => { this.#listenSock = listenSock; }, listenOn.unix);
else this.#uws.listen(listenOn.hostname!, listenOn.port ?? 0, (listenSock) => { this.#listenSock = listenSock; });
}
#listenSock: uws.us_listen_socket | null = null;
#uws: uws.TemplatedApp;
#wss: WebSocketServer;
#server: http.Server;
#ws: WebSocketHandler<T> | null;
#tls: TLSOptions<T> | null;
//#unix?: string;
@@ -159,7 +104,8 @@ class Server<T = undefined> implements BunServer {
development: boolean;
hostname: string;
get port(): number {
const port = uws.us_socket_local_port(this.#listenSock!);
const addrinfo = this.#server.address();
const port = typeof addrinfo === 'string' ? -1 : addrinfo?.port!;
return port === -1 ? undefined as unknown as number : port;
}
id: string;
@@ -172,39 +118,31 @@ class Server<T = undefined> implements BunServer {
return this.#onRequest(request, this) as Response | Promise<Response>;
}
publish(topic: string, data: string | ArrayBufferView | ArrayBuffer | SharedArrayBuffer, compress?: boolean): number {
const message = (typeof data === 'string' ? data : 'buffer' in data ? data.buffer : data) as string | ArrayBuffer;
const success = this.#uws.publish(topic, message, typeof message !== 'string', compress);
if (!success) return 0;
return typeof message === 'string' ? message.length : message.byteLength;
this.#wss.clients.forEach((client) => {
if (client.readyState !== 1) return;
const bunSocket = Reflect.get(client, '@@asBunSocket') as BunSocket<T> | undefined;
if (!bunSocket) throw new Error('Internal error: Expected client to have a BunSocket reference');
if (bunSocket.isSubscribed(topic)) bunSocket.send(data, compress);
});
return 0;
}
upgrade<T = undefined>(request: Request, options?: { headers?: HeadersInit; data?: T; }): boolean {
const uwsInfo = Reflect.get(request, uwsInternalFieldSymbol) as uwsInternalField | undefined;
if (!uwsInfo) return false; // This polyfill can only upgrade requests created by itself
const { req, res } = uwsInfo;
const ctx = Reflect.get(req, uwsUpgradableRequestSymbol) as uwsUpgradableRequest | undefined;
if (!ctx) return false;
res.upgrade({}, ctx.secwskey, ctx.secwsprotocol, ctx.secwsextensions, ctx.context);
return true;
return Reflect.get(request, requestUpgradedSymbol) ?? false;
}
requestIP(request: Request): SocketAddress | null {
const uwsInfo = Reflect.get(request, uwsInternalFieldSymbol) as uwsInternalField | undefined;
if (!uwsInfo) return null;
const fullIP = new TextDecoder().decode(uwsInfo.res.getRemoteAddressAsText());
const [ip, port] = fullIP.split(':');
return {
address: ip,
port: Number(port),
family: ip.includes('.') ? 'IPv4' : 'IPv6',
};
const addrinfo = Reflect.get(request, requestRemoteIPSymbol) as AddressInfo & { family: 'IPv4' | 'IPv6'; } | undefined;
if (addrinfo) return addrinfo;
else return null;
}
reload(options: Serve): void {
this.#onRequest = options.fetch ?? this.#onRequest;
this.#onError = options.error ?? this.#onError;
}
stop(closeActiveConnections?: boolean): void {
if (closeActiveConnections) return void this.#uws.close();
this.#closed = true;
uws.us_listen_socket_close(this.#listenSock!);
if (closeActiveConnections) this.#wss.clients.forEach((client) => client.close());
this.#wss.close();
this.#server.close();
}
};
@@ -225,27 +163,73 @@ function generateSecWSAccept(secWSKey: string) {
.digest('base64');
}
// TODO
function toBunSocket(socket: uws.WebSocket<any>) {
return socket as unknown as ServerWebSocket<any>;
class BunSocket<T extends any> implements ServerWebSocket {
#ws: WebSocket;
#server: Server<T>;
constructor(socket: WebSocket, server: Server<T>) {
this.#ws = socket;
this.#server = server;
Reflect.set(socket, '@@asBunSocket', this);
}
send(data: string | BufferSource, compress?: boolean | undefined): number {
this.#ws.send(data);
return typeof data === 'string' ? Buffer.byteLength(data, 'utf8') : data.byteLength;
}
sendText(data: string, compress?: boolean | undefined): number {
this.#ws.send(data);
return Buffer.byteLength(data, 'utf8');
}
sendBinary(data: BufferSource, compress?: boolean | undefined): number {
this.#ws.send(data);
return data.byteLength;
}
close(code?: number | undefined, reason?: string | undefined): void {
this.#ws.close(code, reason);
}
terminate(): void {
this.#ws.terminate();
}
ping(data?: string | BufferSource | undefined): number {
this.#ws.ping(data);
return typeof data === 'string' ? Buffer.byteLength(data, 'utf8') : data?.byteLength ?? 0;
}
pong(data?: string | BufferSource | undefined): number {
this.#ws.pong(data);
return typeof data === 'string' ? Buffer.byteLength(data, 'utf8') : data?.byteLength ?? 0;
}
publish(topic: string, data: string | BufferSource, compress?: boolean | undefined): number {
return this.#server.publish(topic, data, compress);
}
publishText(topic: string, data: string, compress?: boolean | undefined): number {
return this.publish(topic, data, compress);
}
publishBinary(topic: string, data: BufferSource, compress?: boolean | undefined): number {
return this.publish(topic, data, compress);
}
subscribe(topic: string): void {
this.#subscribedTopics.add(topic);
}
unsubscribe(topic: string): void {
this.#subscribedTopics.delete(topic);
}
isSubscribed(topic: string): boolean {
return this.#subscribedTopics.has(topic);
}
cork(callback: (ws: ServerWebSocket<T>) => T): T {
return callback(this);
}
get remoteAddress(): string {
return this.#ws.url;
};
get readyState(): WebSocketReadyState {
return this.#ws.readyState;
};
#subscribedTopics = new Set<string>();
binaryType?: 'nodebuffer' | 'arraybuffer' | 'uint8array' | undefined;
// @ts-expect-error generic mess
data: T;
}
async function getUwsHttpResponseBody(res: uws.HttpResponse, maxSize: number) {
return new Promise<Buffer>((resolve, reject) => {
const buffers: Buffer[] = [];
let totalSize = 0;
res.onData((ab, isLast) => {
const chunk = Buffer.from(ab);
totalSize += chunk.byteLength;
if (totalSize > maxSize) return void res.close(); // calls onAborted
if (!isLast) return void buffers.push(chunk);
try {
if (buffers.length === 0) return void resolve(Buffer.from(structuredClone(ab)));
buffers.push(chunk);
return void resolve(Buffer.concat(buffers, totalSize));
} catch (e) {
return void res.close(); // calls onAborted
}
});
});
function toBunSocket<T>(socket: WebSocket, server: Server<T>) {
return new BunSocket<T>(socket, server);
}

View File

@@ -7,7 +7,8 @@ import { splitCookiesString } from 'set-cookie-parser';
export const requestNodeResSymbol = Symbol('bun-polyfills.serve.nodeReq');
export const requestRemoteIPSymbol = Symbol('bun-polyfills.serve.remoteIP');
export const toWebRequest = (nodeReq: IncomingMessage, nodeRes: ServerResponse, bodySizeLimit?: number): Request => {
export const requestUpgradedSymbol = Symbol('bun-polyfills.serve.upgraded');
export const toWebRequest = (nodeReq: IncomingMessage, nodeRes?: ServerResponse, bodySizeLimit?: number, upgraded = false): Request => {
const webReq = new Request('http://' + nodeReq.headers.host! + nodeReq.url, {
duplex: 'half',
method: nodeReq.method,
@@ -18,8 +19,9 @@ export const toWebRequest = (nodeReq: IncomingMessage, nodeRes: ServerResponse,
address: nodeReq.socket.remoteAddress, port: nodeReq.socket.remotePort, family: nodeReq.socket.remoteFamily,
});
Reflect.set(webReq, requestNodeResSymbol, nodeRes);
Reflect.set(webReq, requestUpgradedSymbol, upgraded);
return webReq;
}
};
export const sendWebResponse = (nodeRes: ServerResponse, webRes: Response): void => {
const headers = Object.fromEntries(webRes.headers);
@@ -47,7 +49,7 @@ export const sendWebResponse = (nodeRes: ServerResponse, webRes: Response): void
nodeRes.off('error', cancel);
// If the reader has already been interrupted with an error earlier,
// then it will appear here, it is useless, but it needs to be caught.
reader.cancel(error).catch(() => {});
reader.cancel(error).catch(() => { });
if (error) nodeRes.destroy(error);
};
nodeRes.on('close', cancel);
@@ -66,7 +68,7 @@ export const sendWebResponse = (nodeRes: ServerResponse, webRes: Response): void
cancel(error instanceof Error ? error : new Error(String(error)));
}
}
}
};
class HTTPError extends Error {
constructor(status: number, reason: string) {