From 822e3b8cf02e7bdfbca5d2b0bb837dced1b1ca40 Mon Sep 17 00:00:00 2001 From: Ciro Spaciari Date: Wed, 21 May 2025 15:31:31 -0700 Subject: [PATCH] more --- src/bun.js/api/bun/h2_frame_parser.zig | 4 +- src/js/node/http2.ts | 46 ++++++++----- src/js/node/net.ts | 91 ++++++++++++++++++++++++++ 3 files changed, 120 insertions(+), 21 deletions(-) diff --git a/src/bun.js/api/bun/h2_frame_parser.zig b/src/bun.js/api/bun/h2_frame_parser.zig index 5560f0e792..d339a8726f 100644 --- a/src/bun.js/api/bun/h2_frame_parser.zig +++ b/src/bun.js/api/bun/h2_frame_parser.zig @@ -1262,9 +1262,6 @@ pub const H2FrameParser = struct { this.windowSize += WINDOW_INCREMENT_SIZE * total_increment; // we will need at least this many increments to send all the streams this.sendWindowUpdate(0, UInt31WithReserved.from(WINDOW_INCREMENT_SIZE * total_increment)); } - - // incrementing the window size is a good time to flush - _ = this.flush(); } pub fn setSettings(this: *H2FrameParser, settings: FullSettingsPayload) bool { @@ -2387,6 +2384,7 @@ pub const H2FrameParser = struct { } this.readBuffer.reset(); this.remoteSettings = remoteSettings; + defer this.incrementWindowSizeIfNeeded(); if (remoteSettings.initialWindowSize >= this.remoteUsedWindowSize) { defer _ = this.flushStreamQueue(); this.remoteWindowSize = remoteSettings.initialWindowSize; diff --git a/src/js/node/http2.ts b/src/js/node/http2.ts index 0b0bb67739..b30145b54f 100644 --- a/src/js/node/http2.ts +++ b/src/js/node/http2.ts @@ -1879,10 +1879,23 @@ class Http2Stream extends Duplex { validateFunction(callback, "callback"); this.once("close", callback); } - this.rstCode = code; + const { ending } = this._writableState; + if (!ending) { + // If the writable side of the Http2Stream is still open, emit the + // 'aborted' event and set the aborted flag. + if (!this.aborted) { + this[kAborted] = true; + this.emit("aborted"); + } + this.end(); + } markStreamClosed(this); - - session[bunHTTP2Native]?.rstStream(this.#id, code); + if (this.writableFinished) { + this.rstCode = code; + session[bunHTTP2Native]?.rstStream(this.#id, code); + } else { + this.once("finish", rstNextTick.bind(session, this.#id, code)); + } } } _destroy(err, callback) { @@ -1897,7 +1910,7 @@ class Http2Stream extends Duplex { } // at this state destroyed will be true but we need to close the writable side this._writableState.destroyed = false; - this.end(); // why this is needed? + this.end(); // we now restore the destroyed flag this._writableState.destroyed = true; } @@ -1942,6 +1955,15 @@ class Http2Stream extends Duplex { _final(callback) { const status = this[bunHTTP2StreamStatus]; + const session = this[bunHTTP2Session]; + if (session) { + const native = session[bunHTTP2Native]; + if (native) { + this[bunHTTP2StreamStatus] |= StreamState.FinalCalled; + native.writeStream(this.#id, "", "ascii", true, callback); + return; + } + } if ((status & StreamState.WritableClosed) !== 0 || (status & StreamState.Closed) !== 0) { callback(); this[bunHTTP2StreamStatus] |= StreamState.FinalCalled; @@ -2000,13 +2022,7 @@ class Http2Stream extends Duplex { } } const chunk = Buffer.concat(chunks || []); - native.writeStream( - this.#id, - chunk, - undefined, - (this[bunHTTP2StreamStatus] & StreamState.EndedCalled) !== 0, - callback, - ); + native.writeStream(this.#id, chunk, undefined, false, callback); return; } } @@ -2019,13 +2035,7 @@ class Http2Stream extends Duplex { if (session) { const native = session[bunHTTP2Native]; if (native) { - native.writeStream( - this.#id, - chunk, - encoding, - (this[bunHTTP2StreamStatus] & StreamState.EndedCalled) !== 0, - callback, - ); + native.writeStream(this.#id, chunk, encoding, false, callback); return; } } diff --git a/src/js/node/net.ts b/src/js/node/net.ts index 749af39f41..917db8b582 100644 --- a/src/js/node/net.ts +++ b/src/js/node/net.ts @@ -889,6 +889,7 @@ Socket.prototype[kCloseRawConnection] = function () { }; Socket.prototype.connect = function connect(...args) { +<<<<<<< HEAD $debug("Socket.prototype.connect"); { const [options, connectListener] = @@ -899,6 +900,96 @@ Socket.prototype.connect = function connect(...args) { this.servername = options.servername; if (socket) { connection = socket; +======= + const [options, connectListener] = + $isArray(args[0]) && args[0][normalizedArgsSymbol] + ? // args have already been normalized. + // Normalized array is passed as the first and only argument. + ($assert(args[0].length == 2 && typeof args[0][0] === "object"), args[0]) + : normalizeArgs(args); + let connection = this[ksocket]; + let upgradeDuplex = false; + let { + fd, + port, + host, + path, + socket, + localAddress, + localPort, + rejectUnauthorized, + pauseOnConnect, + servername, + checkServerIdentity, + session, + } = options; + if (localAddress && !isIP(localAddress)) { + throw $ERR_INVALID_IP_ADDRESS(localAddress); + } + if (localPort) { + validateNumber(localPort, "options.localPort"); + } + this.servername = servername; + if (socket) { + connection = socket; + } + this.connecting = true; + + if (fd) { + bunConnect({ + data: this, + fd: fd, + socket: this[khandlers], + allowHalfOpen: this.allowHalfOpen, + }).catch(error => { + this.connecting = false; + if (!this.destroyed) { + this.emit("error", error); + this.emit("close"); + } + }); + } + this.pauseOnConnect = pauseOnConnect; + if (!pauseOnConnect) { + process.nextTick(() => { + this.resume(); + }); + } + if (fd) { + return this; + } + if ( + // TLSSocket already created a socket and is forwarding it here. This is a private API. + !(socket && $isObject(socket) && socket instanceof Duplex) && + // public api for net.Socket.connect + port === undefined && + path == null + ) { + throw $ERR_MISSING_ARGS(["options", "port", "path"]); + } + this.remotePort = port; + const bunTLS = this[bunTlsSymbol]; + var tls = undefined; + if (typeof bunTLS === "function") { + tls = bunTLS.$call(this, port, host, true); + // Client always request Cert + this._requestCert = true; + if (tls) { + if (typeof rejectUnauthorized !== "undefined") { + this._rejectUnauthorized = rejectUnauthorized; + tls.rejectUnauthorized = rejectUnauthorized; + } else { + this._rejectUnauthorized = tls.rejectUnauthorized; + } + tls.requestCert = true; + tls.session = session || tls.session; + this.servername = tls.servername; + tls.checkServerIdentity = checkServerIdentity || tls.checkServerIdentity; + this[bunTLSConnectOptions] = tls; + if (!connection && tls.socket) { + connection = tls.socket; + } +>>>>>>> 7e28dcb9e5 (more) } if (fd) { doConnect(this._handle, {