This commit is contained in:
Ciro Spaciari
2025-05-21 15:31:31 -07:00
parent f451ddb910
commit 822e3b8cf0
3 changed files with 120 additions and 21 deletions

View File

@@ -1262,9 +1262,6 @@ pub const H2FrameParser = struct {
this.windowSize += WINDOW_INCREMENT_SIZE * total_increment; // we will need at least this many increments to send all the streams
this.sendWindowUpdate(0, UInt31WithReserved.from(WINDOW_INCREMENT_SIZE * total_increment));
}
// incrementing the window size is a good time to flush
_ = this.flush();
}
pub fn setSettings(this: *H2FrameParser, settings: FullSettingsPayload) bool {
@@ -2387,6 +2384,7 @@ pub const H2FrameParser = struct {
}
this.readBuffer.reset();
this.remoteSettings = remoteSettings;
defer this.incrementWindowSizeIfNeeded();
if (remoteSettings.initialWindowSize >= this.remoteUsedWindowSize) {
defer _ = this.flushStreamQueue();
this.remoteWindowSize = remoteSettings.initialWindowSize;

View File

@@ -1879,10 +1879,23 @@ class Http2Stream extends Duplex {
validateFunction(callback, "callback");
this.once("close", callback);
}
this.rstCode = code;
const { ending } = this._writableState;
if (!ending) {
// If the writable side of the Http2Stream is still open, emit the
// 'aborted' event and set the aborted flag.
if (!this.aborted) {
this[kAborted] = true;
this.emit("aborted");
}
this.end();
}
markStreamClosed(this);
session[bunHTTP2Native]?.rstStream(this.#id, code);
if (this.writableFinished) {
this.rstCode = code;
session[bunHTTP2Native]?.rstStream(this.#id, code);
} else {
this.once("finish", rstNextTick.bind(session, this.#id, code));
}
}
}
_destroy(err, callback) {
@@ -1897,7 +1910,7 @@ class Http2Stream extends Duplex {
}
// at this state destroyed will be true but we need to close the writable side
this._writableState.destroyed = false;
this.end(); // why this is needed?
this.end();
// we now restore the destroyed flag
this._writableState.destroyed = true;
}
@@ -1942,6 +1955,15 @@ class Http2Stream extends Duplex {
_final(callback) {
const status = this[bunHTTP2StreamStatus];
const session = this[bunHTTP2Session];
if (session) {
const native = session[bunHTTP2Native];
if (native) {
this[bunHTTP2StreamStatus] |= StreamState.FinalCalled;
native.writeStream(this.#id, "", "ascii", true, callback);
return;
}
}
if ((status & StreamState.WritableClosed) !== 0 || (status & StreamState.Closed) !== 0) {
callback();
this[bunHTTP2StreamStatus] |= StreamState.FinalCalled;
@@ -2000,13 +2022,7 @@ class Http2Stream extends Duplex {
}
}
const chunk = Buffer.concat(chunks || []);
native.writeStream(
this.#id,
chunk,
undefined,
(this[bunHTTP2StreamStatus] & StreamState.EndedCalled) !== 0,
callback,
);
native.writeStream(this.#id, chunk, undefined, false, callback);
return;
}
}
@@ -2019,13 +2035,7 @@ class Http2Stream extends Duplex {
if (session) {
const native = session[bunHTTP2Native];
if (native) {
native.writeStream(
this.#id,
chunk,
encoding,
(this[bunHTTP2StreamStatus] & StreamState.EndedCalled) !== 0,
callback,
);
native.writeStream(this.#id, chunk, encoding, false, callback);
return;
}
}

View File

@@ -889,6 +889,7 @@ Socket.prototype[kCloseRawConnection] = function () {
};
Socket.prototype.connect = function connect(...args) {
<<<<<<< HEAD
$debug("Socket.prototype.connect");
{
const [options, connectListener] =
@@ -899,6 +900,96 @@ Socket.prototype.connect = function connect(...args) {
this.servername = options.servername;
if (socket) {
connection = socket;
=======
const [options, connectListener] =
$isArray(args[0]) && args[0][normalizedArgsSymbol]
? // args have already been normalized.
// Normalized array is passed as the first and only argument.
($assert(args[0].length == 2 && typeof args[0][0] === "object"), args[0])
: normalizeArgs(args);
let connection = this[ksocket];
let upgradeDuplex = false;
let {
fd,
port,
host,
path,
socket,
localAddress,
localPort,
rejectUnauthorized,
pauseOnConnect,
servername,
checkServerIdentity,
session,
} = options;
if (localAddress && !isIP(localAddress)) {
throw $ERR_INVALID_IP_ADDRESS(localAddress);
}
if (localPort) {
validateNumber(localPort, "options.localPort");
}
this.servername = servername;
if (socket) {
connection = socket;
}
this.connecting = true;
if (fd) {
bunConnect({
data: this,
fd: fd,
socket: this[khandlers],
allowHalfOpen: this.allowHalfOpen,
}).catch(error => {
this.connecting = false;
if (!this.destroyed) {
this.emit("error", error);
this.emit("close");
}
});
}
this.pauseOnConnect = pauseOnConnect;
if (!pauseOnConnect) {
process.nextTick(() => {
this.resume();
});
}
if (fd) {
return this;
}
if (
// TLSSocket already created a socket and is forwarding it here. This is a private API.
!(socket && $isObject(socket) && socket instanceof Duplex) &&
// public api for net.Socket.connect
port === undefined &&
path == null
) {
throw $ERR_MISSING_ARGS(["options", "port", "path"]);
}
this.remotePort = port;
const bunTLS = this[bunTlsSymbol];
var tls = undefined;
if (typeof bunTLS === "function") {
tls = bunTLS.$call(this, port, host, true);
// Client always request Cert
this._requestCert = true;
if (tls) {
if (typeof rejectUnauthorized !== "undefined") {
this._rejectUnauthorized = rejectUnauthorized;
tls.rejectUnauthorized = rejectUnauthorized;
} else {
this._rejectUnauthorized = tls.rejectUnauthorized;
}
tls.requestCert = true;
tls.session = session || tls.session;
this.servername = tls.servername;
tls.checkServerIdentity = checkServerIdentity || tls.checkServerIdentity;
this[bunTLSConnectOptions] = tls;
if (!connection && tls.socket) {
connection = tls.socket;
}
>>>>>>> 7e28dcb9e5 (more)
}
if (fd) {
doConnect(this._handle, {