diff --git a/packages/bun-polyfills/bun.lockb b/packages/bun-polyfills/bun.lockb index 05d968a3db..38e9a5a354 100755 Binary files a/packages/bun-polyfills/bun.lockb and b/packages/bun-polyfills/bun.lockb differ diff --git a/packages/bun-polyfills/package.json b/packages/bun-polyfills/package.json index 3c57f4ba50..93313a88cc 100644 --- a/packages/bun-polyfills/package.json +++ b/packages/bun-polyfills/package.json @@ -32,6 +32,7 @@ "typescript": "^5.2.2" }, "dependencies": { + "@hono/node-server": "^1.3.1", "argon2": "^0.31.2", "bcryptjs": "^2.4.3", "better-sqlite3": "^9.1.1", @@ -39,6 +40,7 @@ "chalk": "^5.3.0", "dateformat": "^5.0.3", "expect": "^29.7.0", + "hono": "^3.11.4", "html-rewriter-wasm": "^0.4.1", "isomorphic-ws": "^5.0.0", "jest-each": "^29.7.0", diff --git a/packages/bun-polyfills/src/global/process.ts b/packages/bun-polyfills/src/global/process.ts index 6cdcfc0bc3..8ef5b7d9aa 100644 --- a/packages/bun-polyfills/src/global/process.ts +++ b/packages/bun-polyfills/src/global/process.ts @@ -16,7 +16,7 @@ if (typeof process === 'object' && process !== null) { process.versions.c_ares = '0e7a5dee0fbb04080750cf6eabbe89d8bae87faa' satisfies Process['versions'][string]; process.versions.zig = '0.12.0-dev.1604+caae40c21' satisfies Process['versions'][string]; process.versions.bun = '1.0.13' satisfies Process['versions'][string]; - Reflect.set(process, 'revision', '5e0160552a0b63087cdf1cd5dd62f69fdca5d875' satisfies Process['revision']); + Reflect.set(process, 'revision', 'b8dcf2caf869bd911a3cb5df77f58fdce20bc185' satisfies Process['revision']); /** @end_generated_code */ // Doesn't work on Windows sadly diff --git a/packages/bun-polyfills/src/modules/bun.ts b/packages/bun-polyfills/src/modules/bun.ts index 790eaec145..fd3658f09f 100644 --- a/packages/bun-polyfills/src/modules/bun.ts +++ b/packages/bun-polyfills/src/modules/bun.ts @@ -48,7 +48,7 @@ export const main = path.resolve(process.cwd(), process.argv[1] ?? 'repl') satis //? These are automatically updated on build by tools/updateversions.ts, do not edit manually. export const version = '1.0.13' satisfies typeof Bun.version; -export const revision = '5e0160552a0b63087cdf1cd5dd62f69fdca5d875' satisfies typeof Bun.revision; +export const revision = 'b8dcf2caf869bd911a3cb5df77f58fdce20bc185' satisfies typeof Bun.revision; export const gc = ( globalThis.gc diff --git a/packages/bun-polyfills/src/modules/bun/serve.ts b/packages/bun-polyfills/src/modules/bun/serve.ts index 6cf0dfc84c..4d4f8d90d7 100644 --- a/packages/bun-polyfills/src/modules/bun/serve.ts +++ b/packages/bun-polyfills/src/modules/bun/serve.ts @@ -1,28 +1,14 @@ /// import type { - Server as BunServer, Serve, TLSServeOptions, UnixTLSServeOptions, TLSWebSocketServeOptions, UnixTLSWebSocketServeOptions, ArrayBufferView, SocketAddress, WebSocketHandler, ServerWebSocket, WebSocketCompressor + Serve, TLSServeOptions, UnixTLSServeOptions, TLSWebSocketServeOptions, UnixTLSWebSocketServeOptions, + Server as BunServer, ArrayBufferView, SocketAddress, WebSocketHandler, ServerWebSocket, WebSocketCompressor } from 'bun'; +import { serve as honoServe } from '@hono/node-server'; +import { WebSocketServer, type AddressInfo } from 'ws'; import { createHash } from 'node:crypto'; -import uws from 'uWebSockets.js'; - -type uwsInternalField = { res: uws.HttpResponse; req: uws.HttpRequest; }; -type uwsUpgradableRequest = { secwskey: string; secwsprotocol: string; secwsextensions: string; context: uws.us_socket_context_t; }; -const uwsInternalFieldSymbol = Symbol('bun-polyfills.serve.uwsInternalField'); -const uwsUpgradableRequestSymbol = Symbol('bun-polyfills.serve.uwsUpgradableRequest'); - -const wsCompressors: Record = { - '128KB': uws.DEDICATED_COMPRESSOR_128KB, - '16KB': uws.DEDICATED_COMPRESSOR_16KB, - '256KB': uws.DEDICATED_COMPRESSOR_256KB, - '32KB': uws.DEDICATED_COMPRESSOR_32KB, - '3KB': uws.DEDICATED_COMPRESSOR_3KB, - '4KB': uws.DEDICATED_COMPRESSOR_4KB, - '64KB': uws.DEDICATED_COMPRESSOR_64KB, - '8KB': uws.DEDICATED_COMPRESSOR_8KB, - dedicated: uws.DEDICATED_COMPRESSOR_32KB, - disable: uws.DISABLED, - shared: uws.SHARED_COMPRESSOR, -} as const; +import fs from 'node:fs'; +import http from 'node:http'; +import { requestRemoteIPSymbol, requestUpgradedSymbol, toWebRequest } from '../../utils/webconv.js'; export function serve(options: Serve): BunServer { return new Server(options); @@ -55,100 +41,59 @@ class Server implements BunServer { this.#onRequest = options.fetch; if (!this.#onRequest) throw new TypeError('Expected fetch() to be a function'); - this.#uws = uws[tls ? 'SSLApp' : 'App']({ - ca_file_name: tls?.ca instanceof Blob ? tls.ca.name - : tls?.ca instanceof Array ? (tls.ca[0] instanceof Blob ? tls.ca[0].name : tls.ca[0]) : tls?.ca, - cert_file_name: tls?.cert instanceof Blob ? tls.cert.name - : tls?.cert instanceof Array ? (tls.cert[0] instanceof Blob ? tls.cert[0].name : tls.cert[0]) : tls?.cert, - dh_params_file_name: tls?.dhParamsFile, - key_file_name: tls?.key instanceof Blob ? tls.key.name - : tls?.key instanceof Array ? (tls.key[0] instanceof Blob ? tls.key[0].name : tls.key[0]) : tls?.key, - passphrase: tls?.passphrase, - ssl_ciphers: tls?.secureOptions?.toString(), - ssl_prefer_low_memory_usage: tls?.lowMemoryMode, + if (tls?.ca instanceof Blob) tls.ca = tls.ca.name; + if (tls?.ca instanceof Array) tls.ca = tls.ca.map((ca) => ca instanceof Blob ? ca.name! : ca); + if (tls?.cert instanceof Blob) tls.cert = tls.cert.name; + if (tls?.cert instanceof Array) tls.cert = tls.cert.map((cert) => cert instanceof Blob ? cert.name! : cert); + if (tls?.key instanceof Blob) tls.key = tls.key.name; + if (tls?.key instanceof Array) tls.key = tls.key.map((key) => key instanceof Blob ? key.name! : key); + this.#server = honoServe({ + serverOptions: { + ca: tls?.ca as string | Buffer | (string | Buffer)[] | undefined, + cert: tls?.cert as string | Buffer | (string | Buffer)[] | undefined, + dhparam: tls?.dhParamsFile ? fs.readFileSync(tls.dhParamsFile) : undefined, + key: tls?.key as string | Buffer | (string | Buffer)[] | undefined, + passphrase: tls?.passphrase, + }, + hostname: listenOn.hostname, + port: listenOn.port, + fetch: async (request) => { + this.pendingRequests++; + const response = await this.#onRequest(request, this); + this.pendingRequests--; + return response; + }, + }, (info) => { }) as http.Server; + this.#server.listen(listenOn.port, listenOn.hostname); + this.#server.on('error', (error) => { + if (this.#onError) this.#onError(error); + }); + this.#server.on('upgrade', (req, duplex, head) => { + this.#onRequest(toWebRequest(req, undefined, this.#maxReqBodySize, true), this); }); - const httpHandler = async (res: uws.HttpResponse, req: uws.HttpRequest) => { - this.pendingRequests++; - res.onAborted(() => { - if (this.#onError) this.#onError(new Error('Aborted')); - }); - const headers = new Headers(); - req.forEach((name, value) => headers.append(name, value)); - const query = req.getQuery(); - const url = `${tls ? 'https' : 'http'}://${headers.get('host')}${req.getUrl()}${query ? '?' + query : ''}`; - const method = req.getMethod(); - const body = method === 'GET' || method === 'HEAD' ? undefined : await getUwsHttpResponseBody(res, this.#maxReqBodySize); - const webReq = new Request(url, { - method, - headers, - body: body?.byteLength ? body : undefined, - }); - Reflect.set(webReq, uwsInternalFieldSymbol, { res, req }); - const webRes = await this.#onRequest(webReq, this); - if (Reflect.get(req, uwsUpgradableRequestSymbol)) return void this.pendingRequests--; - if (!webRes) return this.pendingRequests--, void res.endWithoutBody(); - for (const [name, value] of webRes.headers) { - res.writeHeader(name, value); - } - res.writeStatus(`${webRes.status} ${webRes.statusText}`); - res.end(await webRes.arrayBuffer()); - this.pendingRequests--; - }; - this.#uws.any('/*', httpHandler); - - if (this.#ws) this.#uws.ws('/*', { - sendPingsAutomatically: this.#ws.sendPings ?? true, - idleTimeout: this.#ws.idleTimeout ?? 120, - maxBackpressure: this.#ws.backpressureLimit ?? 1024 * 1024 * 16, - maxPayloadLength: this.#ws.maxPayloadLength ?? 1024 * 1024 * 16, - closeOnBackpressureLimit: Number(this.#ws.closeOnBackpressureLimit ?? false), - compression: !this.#ws.perMessageDeflate || typeof this.#ws.perMessageDeflate === 'boolean' - ? (this.#ws.perMessageDeflate ? uws.SHARED_COMPRESSOR : uws.DISABLED) - : !this.#ws.perMessageDeflate.compress || typeof this.#ws.perMessageDeflate.compress === 'boolean' - ? (this.#ws.perMessageDeflate.compress ? uws.SHARED_COMPRESSOR : uws.DISABLED) - : wsCompressors[this.#ws.perMessageDeflate.compress], - - close: (ws, code, message) => { - if (this.#ws?.close) this.#ws.close(toBunSocket(ws), code, Buffer.from(message).toString('utf8')); - }, - drain: (ws) => { - if (this.#ws?.drain) this.#ws.drain(toBunSocket(ws)); - }, - message: (ws, message, isBinary) => { - if (this.#ws?.message) { - const buf = Buffer.from(message); - this.#ws.message(toBunSocket(ws), isBinary ? buf : buf.toString('utf8')); - } - }, - open: (ws) => { - this.pendingWebSockets++; - if (this.#ws?.open) this.#ws.open(toBunSocket(ws)); + this.#wss = new WebSocketServer({ + server: this.#server, + perMessageDeflate: typeof ws?.perMessageDeflate === 'boolean' + ? ws.perMessageDeflate : !!ws?.perMessageDeflate?.compress || !!ws?.perMessageDeflate?.decompress, + backlog: ws?.backpressureLimit, + // @ts-expect-error untyped "maxPayload" option but it's in the docs + maxPayload: ws?.maxPayloadLength, + }); + this.#wss.on('connection', (socket, req) => { + this.pendingWebSockets++; + this.#ws?.open?.(toBunSocket(socket, this)); + if (this.#ws?.close) socket.onclose = (event) => { + this.#ws?.close?.(toBunSocket(socket, this), event.code, event.reason); this.pendingWebSockets--; - }, - ping: (ws, message) => { - if (this.#ws?.ping) this.#ws.ping(toBunSocket(ws), Buffer.from(message)); - }, - pong: (ws, message) => { - if (this.#ws?.pong) this.#ws.pong(toBunSocket(ws), Buffer.from(message)); - }, - subscription: (ws, topic, newCount, oldCount) => { - - }, - upgrade: async (res, req, context) => { - const secwskey = req.getHeader('sec-websocket-key'); - const secwsprotocol = req.getHeader('sec-websocket-protocol'); - const secwsextensions = req.getHeader('sec-websocket-extensions'); - Reflect.set(req, uwsUpgradableRequestSymbol, { secwskey, secwsprotocol, secwsextensions, context }); - await httpHandler(res, req); - }, + }; + if (this.#ws?.message) socket.onmessage = (event) => this.#ws?.message?.(toBunSocket(socket, this), event.data); + if (this.#ws?.ping) socket.addEventListener('ping', (event) => this.#ws?.ping?.(toBunSocket(socket, this), event.data)); + if (this.#ws?.pong) socket.addEventListener('pong', (event) => this.#ws?.pong?.(toBunSocket(socket, this), event.data)); }); - - if (listenOn.unix) this.#uws.listen_unix((listenSock) => { this.#listenSock = listenSock; }, listenOn.unix); - else this.#uws.listen(listenOn.hostname!, listenOn.port ?? 0, (listenSock) => { this.#listenSock = listenSock; }); } - #listenSock: uws.us_listen_socket | null = null; - #uws: uws.TemplatedApp; + #wss: WebSocketServer; + #server: http.Server; #ws: WebSocketHandler | null; #tls: TLSOptions | null; //#unix?: string; @@ -159,7 +104,8 @@ class Server implements BunServer { development: boolean; hostname: string; get port(): number { - const port = uws.us_socket_local_port(this.#listenSock!); + const addrinfo = this.#server.address(); + const port = typeof addrinfo === 'string' ? -1 : addrinfo?.port!; return port === -1 ? undefined as unknown as number : port; } id: string; @@ -172,39 +118,31 @@ class Server implements BunServer { return this.#onRequest(request, this) as Response | Promise; } publish(topic: string, data: string | ArrayBufferView | ArrayBuffer | SharedArrayBuffer, compress?: boolean): number { - const message = (typeof data === 'string' ? data : 'buffer' in data ? data.buffer : data) as string | ArrayBuffer; - const success = this.#uws.publish(topic, message, typeof message !== 'string', compress); - if (!success) return 0; - return typeof message === 'string' ? message.length : message.byteLength; + this.#wss.clients.forEach((client) => { + if (client.readyState !== 1) return; + const bunSocket = Reflect.get(client, '@@asBunSocket') as BunSocket | undefined; + if (!bunSocket) throw new Error('Internal error: Expected client to have a BunSocket reference'); + if (bunSocket.isSubscribed(topic)) bunSocket.send(data, compress); + }); + return 0; } upgrade(request: Request, options?: { headers?: HeadersInit; data?: T; }): boolean { - const uwsInfo = Reflect.get(request, uwsInternalFieldSymbol) as uwsInternalField | undefined; - if (!uwsInfo) return false; // This polyfill can only upgrade requests created by itself - const { req, res } = uwsInfo; - const ctx = Reflect.get(req, uwsUpgradableRequestSymbol) as uwsUpgradableRequest | undefined; - if (!ctx) return false; - res.upgrade({}, ctx.secwskey, ctx.secwsprotocol, ctx.secwsextensions, ctx.context); - return true; + return Reflect.get(request, requestUpgradedSymbol) ?? false; } requestIP(request: Request): SocketAddress | null { - const uwsInfo = Reflect.get(request, uwsInternalFieldSymbol) as uwsInternalField | undefined; - if (!uwsInfo) return null; - const fullIP = new TextDecoder().decode(uwsInfo.res.getRemoteAddressAsText()); - const [ip, port] = fullIP.split(':'); - return { - address: ip, - port: Number(port), - family: ip.includes('.') ? 'IPv4' : 'IPv6', - }; + const addrinfo = Reflect.get(request, requestRemoteIPSymbol) as AddressInfo & { family: 'IPv4' | 'IPv6'; } | undefined; + if (addrinfo) return addrinfo; + else return null; } reload(options: Serve): void { this.#onRequest = options.fetch ?? this.#onRequest; this.#onError = options.error ?? this.#onError; } stop(closeActiveConnections?: boolean): void { - if (closeActiveConnections) return void this.#uws.close(); this.#closed = true; - uws.us_listen_socket_close(this.#listenSock!); + if (closeActiveConnections) this.#wss.clients.forEach((client) => client.close()); + this.#wss.close(); + this.#server.close(); } }; @@ -225,27 +163,73 @@ function generateSecWSAccept(secWSKey: string) { .digest('base64'); } -// TODO -function toBunSocket(socket: uws.WebSocket) { - return socket as unknown as ServerWebSocket; +class BunSocket implements ServerWebSocket { + #ws: WebSocket; + #server: Server; + constructor(socket: WebSocket, server: Server) { + this.#ws = socket; + this.#server = server; + Reflect.set(socket, '@@asBunSocket', this); + } + send(data: string | BufferSource, compress?: boolean | undefined): number { + this.#ws.send(data); + return typeof data === 'string' ? Buffer.byteLength(data, 'utf8') : data.byteLength; + } + sendText(data: string, compress?: boolean | undefined): number { + this.#ws.send(data); + return Buffer.byteLength(data, 'utf8'); + } + sendBinary(data: BufferSource, compress?: boolean | undefined): number { + this.#ws.send(data); + return data.byteLength; + } + close(code?: number | undefined, reason?: string | undefined): void { + this.#ws.close(code, reason); + } + terminate(): void { + this.#ws.terminate(); + } + ping(data?: string | BufferSource | undefined): number { + this.#ws.ping(data); + return typeof data === 'string' ? Buffer.byteLength(data, 'utf8') : data?.byteLength ?? 0; + } + pong(data?: string | BufferSource | undefined): number { + this.#ws.pong(data); + return typeof data === 'string' ? Buffer.byteLength(data, 'utf8') : data?.byteLength ?? 0; + } + publish(topic: string, data: string | BufferSource, compress?: boolean | undefined): number { + return this.#server.publish(topic, data, compress); + } + publishText(topic: string, data: string, compress?: boolean | undefined): number { + return this.publish(topic, data, compress); + } + publishBinary(topic: string, data: BufferSource, compress?: boolean | undefined): number { + return this.publish(topic, data, compress); + } + subscribe(topic: string): void { + this.#subscribedTopics.add(topic); + } + unsubscribe(topic: string): void { + this.#subscribedTopics.delete(topic); + } + isSubscribed(topic: string): boolean { + return this.#subscribedTopics.has(topic); + } + cork(callback: (ws: ServerWebSocket) => T): T { + return callback(this); + } + get remoteAddress(): string { + return this.#ws.url; + }; + get readyState(): WebSocketReadyState { + return this.#ws.readyState; + }; + #subscribedTopics = new Set(); + binaryType?: 'nodebuffer' | 'arraybuffer' | 'uint8array' | undefined; + // @ts-expect-error generic mess + data: T; } -async function getUwsHttpResponseBody(res: uws.HttpResponse, maxSize: number) { - return new Promise((resolve, reject) => { - const buffers: Buffer[] = []; - let totalSize = 0; - res.onData((ab, isLast) => { - const chunk = Buffer.from(ab); - totalSize += chunk.byteLength; - if (totalSize > maxSize) return void res.close(); // calls onAborted - if (!isLast) return void buffers.push(chunk); - try { - if (buffers.length === 0) return void resolve(Buffer.from(structuredClone(ab))); - buffers.push(chunk); - return void resolve(Buffer.concat(buffers, totalSize)); - } catch (e) { - return void res.close(); // calls onAborted - } - }); - }); +function toBunSocket(socket: WebSocket, server: Server) { + return new BunSocket(socket, server); } diff --git a/packages/bun-polyfills/src/utils/webconv.ts b/packages/bun-polyfills/src/utils/webconv.ts index c60541601e..59e970d777 100644 --- a/packages/bun-polyfills/src/utils/webconv.ts +++ b/packages/bun-polyfills/src/utils/webconv.ts @@ -7,7 +7,8 @@ import { splitCookiesString } from 'set-cookie-parser'; export const requestNodeResSymbol = Symbol('bun-polyfills.serve.nodeReq'); export const requestRemoteIPSymbol = Symbol('bun-polyfills.serve.remoteIP'); -export const toWebRequest = (nodeReq: IncomingMessage, nodeRes: ServerResponse, bodySizeLimit?: number): Request => { +export const requestUpgradedSymbol = Symbol('bun-polyfills.serve.upgraded'); +export const toWebRequest = (nodeReq: IncomingMessage, nodeRes?: ServerResponse, bodySizeLimit?: number, upgraded = false): Request => { const webReq = new Request('http://' + nodeReq.headers.host! + nodeReq.url, { duplex: 'half', method: nodeReq.method, @@ -18,8 +19,9 @@ export const toWebRequest = (nodeReq: IncomingMessage, nodeRes: ServerResponse, address: nodeReq.socket.remoteAddress, port: nodeReq.socket.remotePort, family: nodeReq.socket.remoteFamily, }); Reflect.set(webReq, requestNodeResSymbol, nodeRes); + Reflect.set(webReq, requestUpgradedSymbol, upgraded); return webReq; -} +}; export const sendWebResponse = (nodeRes: ServerResponse, webRes: Response): void => { const headers = Object.fromEntries(webRes.headers); @@ -47,7 +49,7 @@ export const sendWebResponse = (nodeRes: ServerResponse, webRes: Response): void nodeRes.off('error', cancel); // If the reader has already been interrupted with an error earlier, // then it will appear here, it is useless, but it needs to be caught. - reader.cancel(error).catch(() => {}); + reader.cancel(error).catch(() => { }); if (error) nodeRes.destroy(error); }; nodeRes.on('close', cancel); @@ -66,7 +68,7 @@ export const sendWebResponse = (nodeRes: ServerResponse, webRes: Response): void cancel(error instanceof Error ? error : new Error(String(error))); } } -} +}; class HTTPError extends Error { constructor(status: number, reason: string) {